diff --git a/CHANGELOG.md b/CHANGELOG.md index 142d0a7c..2bab97f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- `pw.io.s3.read` now monitors object deletions and modifications in the S3 source, when ran in streaming mode. When an object is deleted in S3, it is also removed from the engine. Similarly, if an object is modified in S3, the engine updates its state to reflect those changes. +- `pw.io.s3.read` now supports `with_metadata` flag, which makes it possible to attach the metadata of the source object to the table entries. + ### Fixed - `pw.xpacks.llm.document_store.DocumentStore` no longer requires `_metadata` column in the input table. diff --git a/integration_tests/s3/base.py b/integration_tests/s3/base.py index 2dbc38b4..65c21720 100644 --- a/integration_tests/s3/base.py +++ b/integration_tests/s3/base.py @@ -17,7 +17,7 @@ def put_aws_object(path, contents): aws_secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"], ) s3_client.put_object( - Bucket="aws-integrationtest", + Bucket=S3_BUCKET_NAME, Key=path, Body=contents, ) @@ -37,6 +37,31 @@ def put_minio_object(path, contents): ) +def delete_aws_object(path): + s3_client = boto3.client( + "s3", + aws_access_key_id=os.environ["AWS_S3_ACCESS_KEY"], + aws_secret_access_key=os.environ["AWS_S3_SECRET_ACCESS_KEY"], + ) + s3_client.delete_objects( + Bucket=S3_BUCKET_NAME, + Delete={"Objects": [{"Key": path}]}, + ) + + +def delete_minio_object(path): + s3_client = boto3.client( + "s3", + aws_access_key_id=os.environ["MINIO_S3_ACCESS_KEY"], + aws_secret_access_key=os.environ["MINIO_S3_SECRET_ACCESS_KEY"], + endpoint_url="https://minio-api.deploys.pathway.com", + ) + s3_client.delete_objects( + Bucket=MINIO_BUCKET_NAME, + Delete={"Objects": [{"Key": path}]}, + ) + + def put_object_into_storage(storage, path, contents): put_object_methods = { "s3": put_aws_object, @@ -45,8 +70,16 @@ def put_object_into_storage(storage, path, contents): return put_object_methods[storage](path, contents) +def delete_object_from_storage(storage, path): + delete_object_methods = { + "s3": delete_aws_object, + "minio": delete_minio_object, + } + return delete_object_methods[storage](path) + + def create_jsonlines(input_dicts): - return "\n".join([json.dumps(value) for value in input_dicts]) + return "\n".join([json.dumps(value) for value in input_dicts]) + "\n" def read_jsonlines_fields(path, keys_to_extract): diff --git a/integration_tests/s3/test_s3_formats.py b/integration_tests/s3/test_s3_formats.py index 0afdb420..bb27aef5 100644 --- a/integration_tests/s3/test_s3_formats.py +++ b/integration_tests/s3/test_s3_formats.py @@ -1,4 +1,5 @@ import json +import time import pytest @@ -10,16 +11,27 @@ @pytest.mark.parametrize("storage_type", ["s3", "minio"]) @pytest.mark.parametrize("format", ["binary", "plaintext", "plaintext_by_object"]) -def test_formats_without_parsing(storage_type, format, tmp_path, s3_path): +@pytest.mark.parametrize("with_metadata", [True, False]) +def test_formats_without_parsing( + storage_type, format, with_metadata, tmp_path, s3_path +): input_path = f"{s3_path}/input.txt" input_full_contents = "abc\n\ndef\nghi\njkl" output_path = tmp_path / "output.json" + uploaded_at = int(time.time()) put_object_into_storage(storage_type, input_path, input_full_contents) - table = create_table_for_storage(storage_type, input_path, format) + table = create_table_for_storage( + storage_type, input_path, format, with_metadata=with_metadata + ) pw.io.jsonlines.write(table, output_path) pw.run() + def check_metadata(metadata): + assert uploaded_at <= metadata["modified_at"] <= uploaded_at + 10 + assert metadata["path"] == input_path + assert metadata["size"] == len(input_full_contents) + if format in ("binary", "plaintext_by_object"): expected_output = ( [ord(c) for c in input_full_contents] @@ -29,11 +41,16 @@ def test_formats_without_parsing(storage_type, format, tmp_path, s3_path): with open(output_path) as f: result = json.load(f) assert result["data"] == expected_output + if with_metadata: + check_metadata(result["_metadata"]) else: lines = [] with open(output_path, "r") as f: for row in f: - lines.append(json.loads(row)["data"]) + result = json.loads(row) + lines.append(result["data"]) + if with_metadata: + check_metadata(result["_metadata"]) lines.sort() target = input_full_contents.split("\n") target.sort() diff --git a/integration_tests/s3/test_s3_generic.py b/integration_tests/s3/test_s3_generic.py index e38c5ce0..5258469f 100644 --- a/integration_tests/s3/test_s3_generic.py +++ b/integration_tests/s3/test_s3_generic.py @@ -42,7 +42,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str): ) G.clear() - input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde" + input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde\n" put_aws_object(s3_input_path, input_contents) table = pw.io.s3_csv.read( s3_path, @@ -62,7 +62,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str): ) G.clear() - input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde\n5,Hola" + input_contents = "key,value\n1,Hello\n2,World\n3,Bonjour\n4,Monde\n5,Hola\n" s3_input_path_2 = f"{s3_path}/input_2.csv" input_contents_2 = "key,value\n6,Mundo" output_path = tmp_path / "output_final.csv" diff --git a/integration_tests/s3/test_s3_streaming.py b/integration_tests/s3/test_s3_streaming.py new file mode 100644 index 00000000..bb20bc49 --- /dev/null +++ b/integration_tests/s3/test_s3_streaming.py @@ -0,0 +1,91 @@ +import pytest + +import pathway as pw +from pathway.tests.utils import ( + ExceptionAwareThread, + FileLinesNumberChecker, + wait_result_with_checker, +) + +from .base import ( + create_table_for_storage, + delete_object_from_storage, + put_object_into_storage, +) + + +@pytest.mark.parametrize("storage_type", ["s3", "minio"]) +def test_object_modified(tmp_path, s3_path, storage_type): + input_path = f"{s3_path}/input.txt" + contents = ["one", "two", "three"] + put_object_into_storage(storage_type, input_path, "\n".join(contents) + "\n") + + output_path = tmp_path / "output.json" + + def stream_data(): + wait_result_with_checker( + FileLinesNumberChecker(output_path, 3), 30, target=None + ) + + contents.append("four") + put_object_into_storage(storage_type, input_path, "\n".join(contents) + "\n") + wait_result_with_checker( + FileLinesNumberChecker(output_path, 4), 30, target=None + ) + + contents.append("five") + contents.append("six") + put_object_into_storage(storage_type, input_path, "\n".join(contents) + "\n") + wait_result_with_checker( + FileLinesNumberChecker(output_path, 6), 30, target=None + ) + + table = create_table_for_storage( + storage_type, s3_path, "plaintext", mode="streaming" + ) + pw.io.jsonlines.write(table, output_path) + + t = ExceptionAwareThread(target=stream_data) + t.start() + wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 90) + t.join() + + +@pytest.mark.parametrize("storage_type", ["s3", "minio"]) +def test_object_deleted(tmp_path, s3_path, storage_type): + input_path_1 = f"{s3_path}/input_1.txt" + input_path_2 = f"{s3_path}/input_2.txt" + input_path_3 = f"{s3_path}/input_3.txt" + put_object_into_storage(storage_type, input_path_1, "one") + + output_path = tmp_path / "output.json" + + def stream_data(): + wait_result_with_checker( + FileLinesNumberChecker(output_path, 1), 30, target=None + ) + + put_object_into_storage(storage_type, input_path_2, "two") + wait_result_with_checker( + FileLinesNumberChecker(output_path, 2), 30, target=None + ) + + delete_object_from_storage(storage_type, input_path_1) + delete_object_from_storage(storage_type, input_path_2) + wait_result_with_checker( + FileLinesNumberChecker(output_path, 4), 30, target=None + ) + + put_object_into_storage(storage_type, input_path_1, "four") + put_object_into_storage(storage_type, input_path_3, "three") + wait_result_with_checker( + FileLinesNumberChecker(output_path, 6), 30, target=None + ) + + table = create_table_for_storage(storage_type, s3_path, "binary", mode="streaming") + pw.io.jsonlines.write(table, output_path) + + t = ExceptionAwareThread(target=stream_data) + t.start() + wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 90) + t.join() diff --git a/integration_tests/webserver/test_rest_connector.py b/integration_tests/webserver/test_rest_connector.py index d570f179..2c13dd21 100644 --- a/integration_tests/webserver/test_rest_connector.py +++ b/integration_tests/webserver/test_rest_connector.py @@ -12,29 +12,12 @@ from pathway.internals.udfs.caches import InMemoryCache from pathway.tests.utils import ( CsvLinesNumberChecker, + ExceptionAwareThread, expect_csv_checker, wait_result_with_checker, ) -class ExceptionAwareThread(threading.Thread): - def run(self): - self._exception = None - try: - if self._target is not None: # type: ignore - self._result = self._target(*self._args, **self._kwargs) # type: ignore - except Exception as e: - self._exception = e - finally: - del self._target, self._args, self._kwargs # type: ignore - - def join(self, timeout=None): - super().join(timeout) - if self._exception: - raise self._exception - return self._result - - def _test_server_basic(tmp_path: pathlib.Path, port: int | str) -> None: output_path = tmp_path / "output.csv" diff --git a/integration_tests/wordcount/base.py b/integration_tests/wordcount/base.py index 8e2b4d39..88954a30 100644 --- a/integration_tests/wordcount/base.py +++ b/integration_tests/wordcount/base.py @@ -399,7 +399,8 @@ def generate_input(file_name, input_size, commit_frequency): dataset_line = json.dumps(dataset_line_dict) fw.write(dataset_line + "\n") if (seq_line_id + 1) % commit_frequency == 0: - fw.write(COMMIT_LINE) + # fw.write(COMMIT_LINE) + pass def generate_next_input(inputs_path): diff --git a/python/pathway/io/minio/__init__.py b/python/pathway/io/minio/__init__.py index 951ab9c9..7a7330af 100644 --- a/python/pathway/io/minio/__init__.py +++ b/python/pathway/io/minio/__init__.py @@ -63,6 +63,7 @@ def read( *, schema: type[Schema] | None = None, mode: str = "streaming", + with_metadata: bool = False, csv_settings: CsvParserSettings | None = None, json_field_paths: dict[str, str] | None = None, downloader_threads_count: int | None = None, @@ -92,6 +93,12 @@ def read( mode: If set to ``streaming``, the engine waits for the new objects under the given path prefix. Set it to ``static``, it only considers the available data and ingest all of it. Default value is ``streaming``. + with_metadata: When set to true, the connector will add an additional column + named ``_metadata`` to the table. This column will be a JSON field that will + contain an optional field ``modified_at``. Additionally, the column will also + have an optional field named ``owner`` containing an ID of the object owner. + Finally, the column will also contain a field named ``path`` that will show + the full path to the object within a bucket from where a row was filled. csv_settings: Settings for the CSV parser. This parameter is used only in case the specified format is "csv". json_field_paths: If the format is "json", this field allows to map field names @@ -149,6 +156,7 @@ def read( schema=schema, csv_settings=csv_settings, mode=mode, + with_metadata=with_metadata, autocommit_duration_ms=autocommit_duration_ms, persistent_id=persistent_id, json_field_paths=json_field_paths, diff --git a/python/pathway/io/s3/__init__.py b/python/pathway/io/s3/__init__.py index 2e93e464..f4b94817 100644 --- a/python/pathway/io/s3/__init__.py +++ b/python/pathway/io/s3/__init__.py @@ -98,6 +98,7 @@ def read( aws_s3_settings: AwsS3Settings | None = None, schema: type[Schema] | None = None, mode: str = "streaming", + with_metadata: bool = False, csv_settings: CsvParserSettings | None = None, json_field_paths: dict[str, str] | None = None, downloader_threads_count: int | None = None, @@ -129,6 +130,12 @@ def read( mode: If set to ``streaming``, the engine waits for the new objects under the given path prefix. Set it to ``static``, it only considers the available data and ingest all of it. Default value is ``streaming``. + with_metadata: When set to true, the connector will add an additional column + named ``_metadata`` to the table. This column will be a JSON field that will + contain an optional field ``modified_at``. Additionally, the column will also + have an optional field named ``owner`` containing an ID of the object owner. + Finally, the column will also contain a field named ``path`` that will show + the full path to the object within a bucket from where a row was filled. csv_settings: Settings for the CSV parser. This parameter is used only in case the specified format is ``csv``. json_field_paths: If the format is ``json``, this field allows to map field names @@ -274,6 +281,7 @@ def read( schema=schema, csv_settings=csv_settings, json_field_paths=json_field_paths, + with_metadata=with_metadata, _stacklevel=5, ) data_source_options = datasource.DataSourceOptions( @@ -300,6 +308,7 @@ def read_from_digital_ocean( *, schema: type[Schema] | None = None, mode: str = "streaming", + with_metadata: bool = False, csv_settings: CsvParserSettings | None = None, json_field_paths: dict[str, str] | None = None, downloader_threads_count: int | None = None, @@ -330,6 +339,12 @@ def read_from_digital_ocean( mode: If set to ``streaming``, the engine waits for the new objects under the given path prefix. Set it to ``static``, it only considers the available data and ingest all of it. Default value is ``streaming``. + with_metadata: When set to true, the connector will add an additional column + named ``_metadata`` to the table. This column will be a JSON field that will + contain an optional field ``modified_at``. Additionally, the column will also + have an optional field named ``owner`` containing an ID of the object owner. + Finally, the column will also contain a field named ``path`` that will show + the full path to the object within a bucket from where a row was filled. csv_settings: Settings for the CSV parser. This parameter is used only in case the specified format is "csv". json_field_paths: If the format is "json", this field allows to map field names @@ -397,6 +412,7 @@ def read_from_digital_ocean( schema=schema, csv_settings=csv_settings, json_field_paths=json_field_paths, + with_metadata=with_metadata, _stacklevel=5, ) datasource_options = datasource.DataSourceOptions( @@ -423,6 +439,7 @@ def read_from_wasabi( *, schema: type[Schema] | None = None, mode: str = "streaming", + with_metadata: bool = False, csv_settings: CsvParserSettings | None = None, json_field_paths: dict[str, str] | None = None, downloader_threads_count: int | None = None, @@ -452,6 +469,12 @@ def read_from_wasabi( mode: If set to ``streaming``, the engine waits for the new objects under the given path prefix. Set it to ``static``, it only considers the available data and ingest all of it. Default value is ``streaming``. + with_metadata: When set to true, the connector will add an additional column + named ``_metadata`` to the table. This column will be a JSON field that will + contain an optional field ``modified_at``. Additionally, the column will also + have an optional field named ``owner`` containing an ID of the object owner. + Finally, the column will also contain a field named ``path`` that will show + the full path to the object within a bucket from where a row was filled. csv_settings: Settings for the CSV parser. This parameter is used only in case the specified format is "csv". json_field_paths: If the format is "json", this field allows to map field names @@ -518,6 +541,7 @@ def read_from_wasabi( schema=schema, csv_settings=csv_settings, json_field_paths=json_field_paths, + with_metadata=with_metadata, _stacklevel=5, ) datasource_options = datasource.DataSourceOptions( diff --git a/python/pathway/tests/test_py_object_wrapper.py b/python/pathway/tests/test_py_object_wrapper.py index 823fbb94..810c087c 100644 --- a/python/pathway/tests/test_py_object_wrapper.py +++ b/python/pathway/tests/test_py_object_wrapper.py @@ -265,6 +265,8 @@ class Checker: error: AssertionError | None = None def __call__(self) -> bool: + if not output_path.exists(): + return False try: G.clear() result = pw.io.csv.read(output_path, schema=OutputSchema, mode="static") diff --git a/python/pathway/tests/utils.py b/python/pathway/tests/utils.py index 7c067b22..f606b68f 100644 --- a/python/pathway/tests/utils.py +++ b/python/pathway/tests/utils.py @@ -50,6 +50,24 @@ def skip_on_multiple_workers() -> None: pytest.skip() +class ExceptionAwareThread(threading.Thread): + def run(self): + self._exception = None + try: + if self._target is not None: # type: ignore + self._result = self._target(*self._args, **self._kwargs) # type: ignore + except Exception as e: + self._exception = e + finally: + del self._target, self._args, self._kwargs # type: ignore + + def join(self, timeout=None): + super().join(timeout) + if self._exception: + raise self._exception + return self._result + + class UniquePortDispenser: """ Tests are run simultaneously by several workers. diff --git a/src/connectors/data_storage.rs b/src/connectors/data_storage.rs index 6ba57587..1afbf70b 100644 --- a/src/connectors/data_storage.rs +++ b/src/connectors/data_storage.rs @@ -50,6 +50,7 @@ use crate::engine::Value; use crate::engine::{DateTimeNaive, DateTimeUtc, Duration as EngineDuration}; use crate::persistence::backends::Error as PersistenceBackendError; use crate::persistence::frontier::OffsetAntichain; +use crate::persistence::tracker::WorkerPersistentStorage; use crate::persistence::{ExternalPersistentId, PersistentId}; use crate::python_api::extract_value; use crate::python_api::threads::PythonThreadState; @@ -257,6 +258,9 @@ pub enum ReadError { #[error(transparent)] Bincode(#[from] BincodeError), + #[error(transparent)] + Persistence(#[from] PersistenceBackendError), + #[error("malformed data")] MalformedData, @@ -324,13 +328,14 @@ pub fn new_filesystem_reader( read_method: ReadMethod, object_pattern: &str, ) -> Result { - let scanner = FilesystemScanner::new(path, persistent_id, streaming_mode, object_pattern)?; + let scanner = FilesystemScanner::new(path, object_pattern)?; let tokenizer = BufReaderTokenizer::new(read_method); - Ok(PosixLikeReader::new( + PosixLikeReader::new( Box::new(scanner), Box::new(tokenizer), + streaming_mode, persistent_id, - )) + ) } pub fn new_csv_filesystem_reader( @@ -340,13 +345,14 @@ pub fn new_csv_filesystem_reader( persistent_id: Option, object_pattern: &str, ) -> Result { - let scanner = FilesystemScanner::new(path, persistent_id, streaming_mode, object_pattern)?; + let scanner = FilesystemScanner::new(path, object_pattern)?; let tokenizer = CsvTokenizer::new(parser_builder); - Ok(PosixLikeReader::new( + PosixLikeReader::new( Box::new(scanner), Box::new(tokenizer), + streaming_mode, persistent_id, - )) + ) } pub fn new_s3_generic_reader( @@ -357,18 +363,14 @@ pub fn new_s3_generic_reader( read_method: ReadMethod, downloader_threads_count: usize, ) -> Result { - let scanner = S3Scanner::new( - bucket, - objects_prefix, - streaming_mode, - downloader_threads_count, - )?; + let scanner = S3Scanner::new(bucket, objects_prefix, downloader_threads_count)?; let tokenizer = BufReaderTokenizer::new(read_method); - Ok(PosixLikeReader::new( + PosixLikeReader::new( Box::new(scanner), Box::new(tokenizer), + streaming_mode, persistent_id, - )) + ) } pub fn new_s3_csv_reader( @@ -379,18 +381,14 @@ pub fn new_s3_csv_reader( persistent_id: Option, downloader_threads_count: usize, ) -> Result { - let scanner = S3Scanner::new( - bucket, - objects_prefix, - streaming_mode, - downloader_threads_count, - )?; + let scanner = S3Scanner::new(bucket, objects_prefix, downloader_threads_count)?; let tokenizer = CsvTokenizer::new(parser_builder); - Ok(PosixLikeReader::new( + PosixLikeReader::new( Box::new(scanner), Box::new(tokenizer), + streaming_mode, persistent_id, - )) + ) } pub trait Reader { @@ -401,6 +399,13 @@ pub trait Reader { fn update_persistent_id(&mut self, persistent_id: Option); fn persistent_id(&self) -> Option; + fn initialize_cached_objects_storage( + &mut self, + _: &WorkerPersistentStorage, + _: PersistentId, + ) -> Result<(), ReadError> { + Ok(()) + } fn merge_two_frontiers(lhs: &OffsetAntichain, rhs: &OffsetAntichain) -> OffsetAntichain where @@ -819,13 +824,6 @@ impl ConnectorMode { ConnectorMode::Streaming => true, } } - - pub fn are_deletions_enabled(&self) -> bool { - match self { - ConnectorMode::Static => false, - ConnectorMode::Streaming => true, - } - } } pub struct PythonReaderBuilder { diff --git a/src/connectors/data_tokenize.rs b/src/connectors/data_tokenize.rs index 6b3cc395..64a7a105 100644 --- a/src/connectors/data_tokenize.rs +++ b/src/connectors/data_tokenize.rs @@ -1,6 +1,5 @@ // Copyright © 2024 Pathway -use log::error; use std::io::BufReader; use std::io::Read; use std::mem::take; @@ -20,7 +19,6 @@ pub trait Tokenize: Send + 'static { data_event_type: DataEventType, ) -> Result<(), ReadError>; fn next_entry(&mut self) -> Result, ReadError>; - fn seek(&mut self, bytes_offset: u64) -> Result<(), ReadError>; } pub struct CsvTokenizer { @@ -77,38 +75,6 @@ impl Tokenize for CsvTokenizer { Ok(None) } } - - fn seek(&mut self, bytes_offset: u64) -> Result<(), ReadError> { - if bytes_offset == 0 { - return Ok(()); - } - let csv_reader = self - .csv_reader - .as_mut() - .expect("seek for an uninitialized tokenizer"); - - let mut header_record = csv::StringRecord::new(); - if csv_reader.read_record(&mut header_record)? { - let header_reader_context = ReaderContext::from_tokenized_entries( - self.current_event_type, - header_record - .iter() - .map(std::string::ToString::to_string) - .collect(), - ); - self.deferred_next_entry = Some((header_reader_context, bytes_offset)); - } - - let mut current_offset = csv_reader.position().byte(); - let mut current_record = csv::StringRecord::new(); - while current_offset < bytes_offset && csv_reader.read_record(&mut current_record)? { - current_offset = csv_reader.position().byte(); - } - if current_offset != bytes_offset { - error!("Inconsistent bytes position in rewinded CSV object: expected {bytes_offset}, got {current_offset}"); - } - Ok(()) - } } pub struct BufReaderTokenizer { @@ -162,38 +128,4 @@ impl Tokenize for BufReaderTokenizer { Ok(None) } } - - fn seek(&mut self, bytes_offset: u64) -> Result<(), ReadError> { - if bytes_offset == 0 { - return Ok(()); - } - - let reader = self - .reader - .as_mut() - .expect("seek for an uninitialized tokenizer"); - - let mut bytes_read = 0; - while bytes_read < bytes_offset { - let mut current_line = Vec::new(); - let len = self - .read_method - .read_next_bytes(reader, &mut current_line)?; - if len == 0 { - break; - } - bytes_read += len as u64; - } - - if bytes_read != bytes_offset { - if bytes_read == bytes_offset + 1 || bytes_read == bytes_offset + 2 { - error!("Read {} bytes instead of expected {bytes_read}. If the file did not have newline at the end, you can ignore this message", bytes_offset); - } else { - error!("Inconsistent bytes position in rewinded plaintext object: expected {bytes_read}, got {bytes_offset}"); - } - } - - self.current_bytes_read = bytes_read; - Ok(()) - } } diff --git a/src/connectors/metadata.rs b/src/connectors/metadata.rs index 8f0bcd26..5cdcc7f4 100644 --- a/src/connectors/metadata.rs +++ b/src/connectors/metadata.rs @@ -1,28 +1,34 @@ // Copyright © 2024 Pathway +use log::error; use std::path::Path; use std::time::{SystemTime, UNIX_EPOCH}; -use serde::Serialize; +use chrono::DateTime; +use s3::serde_types::Object as S3Object; +use serde::{Deserialize, Serialize}; use crate::timestamp::current_unix_timestamp_secs; /// Basic metadata for a file-like object #[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Ord, PartialOrd)] pub struct SourceMetadata { // Creation and modification time may not be available at some platforms // Stored in u64 for easy serialization created_at: Option, pub modified_at: Option, - // Owner may be unavailable at some platforms and on S3 + // Owner may be unavailable at some platforms owner: Option, // Path should always be available. We make it String for two reasons: // * S3 path is denoted as a String // * This object is directly serialized and passed into a connector row - path: String, + pub path: String, + + // Size (in bytes) should be always available. + pub size: u64, // Record acquisition time. Required for the real-time indexer processes // to determine the gap between finding file and indexing it. @@ -40,9 +46,46 @@ impl SourceMetadata { modified_at, owner, path: path.to_string_lossy().to_string(), + size: meta.len(), + seen_at: current_unix_timestamp_secs(), + } + } + + pub fn from_s3_object(object: &S3Object) -> Self { + let modified_at: Option = match DateTime::parse_from_rfc3339(&object.last_modified) { + Ok(last_modified) => { + if let Ok(last_modified) = last_modified.timestamp().try_into() { + Some(last_modified) + } else { + error!("S3 modification time is not a UNIX timestamp: {last_modified}"); + None + } + } + Err(e) => { + error!( + "Failed to parse RFC 3339 timestamp '{}' from S3 metadata: {e}", + object.last_modified + ); + None + } + }; + + Self { + created_at: None, + modified_at, + owner: object.owner.as_ref().map(|owner| owner.id.clone()), + path: object.key.clone(), + size: object.size, seen_at: current_unix_timestamp_secs(), } } + + /// Checks if file contents could have been changed. + pub fn is_changed(&self, other: &SourceMetadata) -> bool { + self.modified_at != other.modified_at + || self.size != other.size + || self.owner != other.owner + } } #[cfg(target_os = "linux")] diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index 68b5f979..3db0fe50 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -346,12 +346,21 @@ impl Connector { snapshot_access: SnapshotAccess, realtime_reader_needed: bool, ) -> Result<(), ReadError> { + info!( + "Enter read_snapshot method with reader {:?}", + reader.storage_type() + ); let mut frontier = OffsetAntichain::new(); if snapshot_access.is_replay_allowed() { persistence_mode.on_before_reading_snapshot(sender); - // Rewind the data source - if let Some(persistent_storage) = persistent_storage { - if let Some(persistent_id) = reader.persistent_id() { + } + if let Some(persistent_storage) = persistent_storage { + if let Some(persistent_id) = reader.persistent_id() { + reader.initialize_cached_objects_storage( + &persistent_storage.lock().unwrap(), + persistent_id, + )?; + if snapshot_access.is_replay_allowed() { if persistent_storage .lock() .unwrap() diff --git a/src/connectors/offset.rs b/src/connectors/offset.rs index d76d9159..67711387 100644 --- a/src/connectors/offset.rs +++ b/src/connectors/offset.rs @@ -48,7 +48,7 @@ pub enum OffsetValue { PosixLikeOffset { total_entries_read: u64, path: Arc<[u8]>, - bytes_offset: u64, + bytes_offset: u64, // No longer needed }, PythonCursor { raw_external_offset: Arc<[u8]>, diff --git a/src/connectors/posix_like.rs b/src/connectors/posix_like.rs index 498fa34a..4818a565 100644 --- a/src/connectors/posix_like.rs +++ b/src/connectors/posix_like.rs @@ -1,83 +1,160 @@ // Copyright © 2024 Pathway -use log::warn; +use log::{error, info, warn}; +use std::collections::VecDeque; +use std::io::Cursor; +use std::mem::take; +use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; +use crate::connectors::data_storage::ConnectorMode; use crate::connectors::data_tokenize::Tokenize; -use crate::connectors::scanner::PosixLikeScanner; -use crate::connectors::{DataEventType, ReadError, ReadResult, Reader, StorageType}; -use crate::connectors::{OffsetKey, OffsetValue}; +use crate::connectors::scanner::{PosixLikeScanner, QueuedAction}; +use crate::connectors::{ + DataEventType, OffsetKey, OffsetValue, ReadError, ReadResult, Reader, StorageType, +}; +use crate::persistence::backends::MemoryKVStorage; +use crate::persistence::cached_object_storage::CachedObjectStorage; use crate::persistence::frontier::OffsetAntichain; +use crate::persistence::tracker::WorkerPersistentStorage; use crate::persistence::PersistentId; +struct CurrentAction { + action: QueuedAction, + offset_path: Arc<[u8]>, +} + +impl From for CurrentAction { + fn from(action: QueuedAction) -> Self { + Self { + offset_path: action.path().into(), + action, + } + } +} + #[allow(clippy::module_name_repetitions)] pub struct PosixLikeReader { scanner: Box, tokenizer: Box, persistent_id: Option, + streaming_mode: ConnectorMode, total_entries_read: u64, - deferred_read_result: Option, + had_queue_refresh: bool, + cached_object_storage: CachedObjectStorage, + current_action: Option, + scanner_actions_queue: VecDeque, } impl PosixLikeReader { pub fn new( scanner: Box, tokenizer: Box, + streaming_mode: ConnectorMode, persistent_id: Option, - ) -> Self { - Self { + ) -> Result { + Ok(Self { scanner, tokenizer, + streaming_mode, persistent_id, total_entries_read: 0, - deferred_read_result: None, - } + had_queue_refresh: false, + current_action: None, + scanner_actions_queue: VecDeque::new(), + cached_object_storage: CachedObjectStorage::new(Box::new(MemoryKVStorage::new()))?, + }) } } impl Reader for PosixLikeReader { fn seek(&mut self, frontier: &OffsetAntichain) -> Result<(), ReadError> { + // FIXME: having only a path in the offset is not enough. + // Suppose there were several modifications of a file, at 12:01, 12:02, 12:03. + // All of them have been processed by the scanner and sent on the outside. This way, + // the latest known version of this file corresponds to 12:03. + // + // Now suppose that the threshold time on the recovery corresponds to 12:02. Then + // the version that corresponds to 12:02 must be loaded to the CachedObjectStorage. + // + // Moreover, all further read object modifications must be omitted and treated as + // something that the scanner has never met. + // + // It can be done if: + // 1. We use metadata as an offset, not just an object name. The metadata has the `seen_at` + // field that can work as a version already. + // 2. We store all versions in the CachedObjectStorage. + // 3. On recovery, we remove use the version that corresponds to the provided metadata. + // 4. After the recovery is done, we delete everything that has `seen_at` greater than + // `seen_at` of the target object as well as objects with the same `seen_at` and greater + // `path` lexicographically. + // + // Also to be noted that `seen_at` must be handled correctly in case the objects are + // downloaded via thread pool. If the metadata is formed not while downloading, it + // will be OK. Anyway, an assert in `CachedObjectStorage` that checks that the pairs + // of (`seen_at`, `path`) form an increasing sequence, would help. let offset_value = frontier.get_offset(&OffsetKey::Empty); let Some(offset_value) = offset_value else { + self.cached_object_storage.clear()?; return Ok(()); }; let Some(OffsetValue::PosixLikeOffset { total_entries_read, path: object_path_arc, - bytes_offset, + bytes_offset: _, }) = offset_value.as_posix_like_offset() else { warn!("Incorrect type of offset value in PosixLike frontier: {offset_value:?}"); return Ok(()); }; - // Scanner part: detect already processed objects - self.scanner.seek_to_object(object_path_arc.as_ref())?; + let stored_metadata = self + .cached_object_storage + .stored_metadata(object_path_arc.as_ref()) + .expect("Cached object storage must contain metadata for a cached object"); + self.scanner_actions_queue.clear(); + self.current_action = + Some(QueuedAction::Read(object_path_arc.to_vec(), stored_metadata.clone()).into()); + let are_deletions_enabled = self.are_deletions_enabled(); + let actual_metadata = self.scanner.object_metadata(object_path_arc.as_ref())?; + if let Some(metadata) = actual_metadata { + let reread_needed = stored_metadata.is_changed(&metadata); + if reread_needed && are_deletions_enabled { + info!( + "The last read object has changed since it was last read. It will be reread." + ); + self.scanner_actions_queue + .push_back(QueuedAction::Update(object_path_arc.to_vec(), metadata)); + self.current_action = None; + } + } else if are_deletions_enabled { + info!("The last read object is no longer present in the source. It will be removed from the engine."); + self.scanner_actions_queue + .push_back(QueuedAction::Delete(object_path_arc.to_vec())); + self.current_action = None; + } - // If the last object read is missing from the offset, the program will read all objects in the directory. - // We could consider alternative strategies: - // 1. Return an error and stop the program; - // 2. Track all objects that have been read in the offset (though this would greatly increase the offset size); - // 3. Only use the object's last modification time. - let Ok(Some(reader)) = self.scanner.current_object_reader() else { - return Ok(()); - }; + // No need to set up a tokenizer here: `current_action` is set only + // in case the object had already been read in full and requires no + // further processing. self.total_entries_read = total_entries_read; + Ok(()) + } - // Seek within a particular object - self.tokenizer - .set_new_reader(reader, DataEventType::Insert)?; - self.tokenizer.seek(bytes_offset)?; - + fn initialize_cached_objects_storage( + &mut self, + persistence_manager: &WorkerPersistentStorage, + persistent_id: PersistentId, + ) -> Result<(), ReadError> { + self.cached_object_storage = + persistence_manager.create_cached_object_storage(persistent_id)?; Ok(()) } fn read(&mut self) -> Result { - if let Some(deferred_read_result) = self.deferred_read_result.take() { - return Ok(deferred_read_result); - } - // Try to continue to read the current object. let maybe_entry = self.tokenizer.next_entry()?; if let Some((entry, bytes_offset)) = maybe_entry { @@ -86,7 +163,7 @@ impl Reader for PosixLikeReader { OffsetKey::Empty, OffsetValue::PosixLikeOffset { total_entries_read: self.total_entries_read, - path: self.scanner.current_offset_object().clone().unwrap(), + path: self.current_action.as_ref().unwrap().offset_path.clone(), bytes_offset, }, ); @@ -95,16 +172,8 @@ impl Reader for PosixLikeReader { // We've failed to read the current object because it's over. // Then let's try to find the next one. - let next_read_result = self.scanner.next_scanner_action()?; + let next_read_result = self.next_scanner_action()?; if let Some(next_read_result) = next_read_result { - if let Some(reader) = self.scanner.current_object_reader()? { - self.tokenizer.set_new_reader( - reader, - self.scanner.data_event_type().expect( - "inconsistent: current object is determined but the event type isn't", - ), - )?; - } return Ok(next_read_result); } @@ -123,3 +192,110 @@ impl Reader for PosixLikeReader { StorageType::PosixLike } } + +impl PosixLikeReader { + fn next_scanner_action(&mut self) -> Result, ReadError> { + // If there is an ongoing action, we must finalize it + // and emit the corresponding event. + if let Some(current_action) = take(&mut self.current_action) { + let commit_allowed = match current_action.action { + QueuedAction::Delete(path) => { + self.cached_object_storage + .remove_object(path.as_ref()) + .expect("Cached object storage doesn't contain an indexed object"); + true + } + QueuedAction::Update(path, metadata) => { + self.scanner_actions_queue + .push_front(QueuedAction::Read(path, metadata)); + false + } + QueuedAction::Read(path, _) => { + let are_deletions_enabled = self.are_deletions_enabled(); + let is_persisted = self.persistent_id.is_some(); + if !is_persisted && !are_deletions_enabled { + // Don't store a copy in memory if it won't be + // needed for undoing an object. + self.cached_object_storage + .remove_object(path.as_ref()) + .expect("Removal from InMemory cache should not fail"); + } + true + } + }; + return Ok(Some(ReadResult::FinishedSource { commit_allowed })); + } + + // Find the next valid action to execute + let are_deletions_enabled = self.are_deletions_enabled(); + loop { + let action = self.scanner_actions_queue.pop_front(); + match &action { + Some(QueuedAction::Read(path, metadata)) => { + let Ok(cached_object_contents) = self.scanner.read_object(path.as_ref()) else { + error!("Failed to get contents of a queued object {metadata:?}"); + continue; + }; + let contents_for_caching = if are_deletions_enabled { + cached_object_contents.clone() + } else { + Vec::with_capacity(0) + }; + self.cached_object_storage.place_object( + path.as_ref(), + contents_for_caching, + metadata.clone(), + )?; + let reader = Box::new(Cursor::new(cached_object_contents)); + self.tokenizer + .set_new_reader(reader, DataEventType::Insert)?; + let result = ReadResult::NewSource(Some(metadata.clone())); + self.current_action = Some(action.unwrap().into()); + return Ok(Some(result)); + } + Some(QueuedAction::Delete(path) | QueuedAction::Update(path, _)) => { + let old_metadata = self + .cached_object_storage + .stored_metadata(path.as_ref()) + .expect("Metadata for all indexed objects must be stored in the engine"); + let cached_object_contents = self + .cached_object_storage + .get_object(path.as_ref()) + .expect("Copy of a cached object must be present to perform deletion"); + let reader = Box::new(Cursor::new(cached_object_contents)); + self.tokenizer + .set_new_reader(reader, DataEventType::Delete)?; + let result = ReadResult::NewSource(Some(old_metadata.clone())); + self.current_action = Some(action.unwrap().into()); + return Ok(Some(result)); + } + None => { + if self.streaming_mode.is_polling_enabled() || !self.had_queue_refresh { + self.had_queue_refresh = true; + let new_actions = self.scanner.next_scanner_actions( + are_deletions_enabled, + &self.cached_object_storage, + )?; + for action in new_actions { + self.scanner_actions_queue.push_back(action); + } + if self.scanner_actions_queue.is_empty() { + // Don't poll the backend too often. + sleep(Self::sleep_duration()); + } + } else { + return Ok(None); + } + } + } + } + } + + fn are_deletions_enabled(&self) -> bool { + self.persistent_id.is_some() || self.streaming_mode.is_polling_enabled() + } + + fn sleep_duration() -> Duration { + Duration::from_millis(500) + } +} diff --git a/src/connectors/scanner/filesystem.rs b/src/connectors/scanner/filesystem.rs index fe6201ff..fe0a246a 100644 --- a/src/connectors/scanner/filesystem.rs +++ b/src/connectors/scanner/filesystem.rs @@ -1,330 +1,111 @@ -use rand::Rng; -use std::collections::HashMap; -use std::collections::VecDeque; -use std::env; use std::ffi::OsStr; use std::fmt::Debug; -use std::fs::File; -use std::io; -use std::io::Read; -use std::mem::take; use std::os::unix::ffi::OsStrExt; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::thread::sleep; -use std::time::{Duration, SystemTime}; +use std::path::PathBuf; -use log::{error, warn}; -use tempfile::{tempdir, TempDir}; -use xxhash_rust::xxh3::Xxh3 as Hasher; +use log::error; -use crate::connectors::data_storage::{ConnectorMode, DataEventType}; use crate::connectors::metadata::SourceMetadata; -use crate::connectors::scanner::PosixLikeScanner; -use crate::connectors::{ReadError, ReadResult}; -use crate::fs_helpers::ensure_directory; -use crate::persistence::PersistentId; -use crate::timestamp::current_unix_timestamp_secs; +use crate::connectors::scanner::{PosixLikeScanner, QueuedAction}; +use crate::connectors::ReadError; +use crate::persistence::cached_object_storage::CachedObjectStorage; use glob::Pattern as GlobPattern; -#[derive(Clone, Debug)] -pub enum PosixScannerAction { - Read(Arc), - Delete(Arc), -} - #[derive(Debug)] #[allow(clippy::module_name_repetitions)] pub struct FilesystemScanner { path: GlobPattern, - cache_directory_path: Option, - streaming_mode: ConnectorMode, object_pattern: String, - - // Mapping from the path of the loaded file to its modification timestamp - known_files: HashMap, - - current_action: Option, - cached_modify_times: HashMap>, - next_file_for_insertion: Option, - cached_metadata: HashMap>, - scanner_actions_queue: VecDeque, - - // Storage is deleted on object destruction, so we need to store it - // for the connector's life time - _connector_tmp_storage: Option, } impl PosixLikeScanner for FilesystemScanner { - fn current_object_reader( - &mut self, - ) -> Result>, ReadError> { - let raw_file_path = match &self.current_action { - Some(PosixScannerAction::Read(path)) => path.clone().as_os_str().as_bytes().to_vec(), - Some(PosixScannerAction::Delete(path)) => self - .cached_file_path(path) - .map(|x| x.as_os_str().as_bytes().to_vec()) - .expect("Cached file path must be present"), - None => return Ok(None), - }; - let file_path: PathBuf = OsStr::from_bytes(&raw_file_path).into(); - let opened_file = File::open(file_path.as_path())?; - let reader = Box::new(opened_file); - Ok(Some(reader)) - } - - fn data_event_type(&self) -> Option { - self.current_action - .as_ref() - .map(|current_action| match current_action { - PosixScannerAction::Read(_) => DataEventType::Insert, - PosixScannerAction::Delete(_) => DataEventType::Delete, - }) - } - - fn current_offset_object(&self) -> Option> { - match &self.current_action { - Some(PosixScannerAction::Read(path) | PosixScannerAction::Delete(path)) => { - Some(path.clone().as_os_str().as_bytes().into()) - } - None => None, - } - } - - fn seek_to_object(&mut self, seek_file_path: &[u8]) -> Result<(), ReadError> { - let seek_file_path: PathBuf = OsStr::from_bytes(seek_file_path).into(); - if self.streaming_mode.are_deletions_enabled() { - warn!("seek for snapshot mode may not work correctly in case deletions take place"); - } - - self.known_files.clear(); - let target_modify_time = match std::fs::metadata(&seek_file_path) { - Ok(metadata) => metadata.modified()?, + fn object_metadata(&mut self, object_path: &[u8]) -> Result, ReadError> { + let path: PathBuf = OsStr::from_bytes(object_path).into(); + match std::fs::metadata(&path) { + Ok(metadata) => Ok(Some(SourceMetadata::from_fs_meta(&path, &metadata))), Err(e) => { - if !matches!(e.kind(), std::io::ErrorKind::NotFound) { - return Err(ReadError::Io(e)); + if matches!(e.kind(), std::io::ErrorKind::NotFound) { + Ok(None) + } else { + Err(ReadError::Io(e)) } - warn!( - "Unable to restore state: last persisted file {seek_file_path:?} not found in directory. Processing all files in directory." - ); - return Ok(()); - } - }; - let matching_files: Vec = self.get_matching_file_paths()?; - for entry in matching_files { - if !entry.is_file() { - continue; - } - let Some(modify_time) = self.modify_time(&entry) else { - continue; - }; - if (modify_time, entry.as_path()) <= (target_modify_time, &seek_file_path) { - let modify_timestamp = modify_time - .duration_since(SystemTime::UNIX_EPOCH) - .expect("System time should be after the Unix epoch") - .as_secs(); - self.known_files.insert(entry, modify_timestamp); } } - self.current_action = Some(PosixScannerAction::Read(Arc::new(seek_file_path.clone()))); - - Ok(()) } - fn next_scanner_action(&mut self) -> Result, ReadError> { - // If there is an ongoing action, we must finalize it - // and emit the corresponding event. - if let Some(current_action) = take(&mut self.current_action) { - match current_action { - PosixScannerAction::Delete(path) => { - let cached_path = self - .cached_file_path(&path) - .expect("in case of enabled deletions cache should exist"); - std::fs::remove_file(cached_path)?; - - // Whether we can commit or not depends on whether this deletion was a part - // of a file replacement or not. Therefore we can use `self.next_file_for_insertion` - // to learn if it's the case. - return Ok(Some(ReadResult::FinishedSource { - commit_allowed: self.next_file_for_insertion.is_none(), - })); - } - PosixScannerAction::Read(_) => { - // The insertion is done. There are two cases that the can involve the replacement: - // 1. It was a file replacement. Then it means that the file replacement has finished - // and we can proceed with allowing commits. - // 2. It was a file insertion. Then it means that the file insertion is done and we - // can also proceed with allowing commits. - // - // So whatever case we have, we can proceed with allowing deletions. - return Ok(Some(ReadResult::FinishedSource { - commit_allowed: true, - })); - } - } - } - - // File modification is handled as combination of its deletion and insertion - // If a file was deleted in the last action, now we must add it, and after that - // we may allow commits. - if let Some(next_file_for_insertion) = take(&mut self.next_file_for_insertion) { - if next_file_for_insertion.exists() { - return Ok(Some( - self.initiate_file_insertion(&next_file_for_insertion)?, - )); - } - - // The scheduled insertion after deletion is impossible because - // the file has already been deleted. - // The action was done in full now, and we can allow commits. - return Ok(Some(ReadResult::FinishedSource { - commit_allowed: true, - })); - } - - if self.scanner_actions_queue.is_empty() { - self.enqueue_next_actions()?; - } + fn read_object(&mut self, object_path: &[u8]) -> Result, ReadError> { + let path: PathBuf = OsStr::from_bytes(object_path).into(); + Ok(std::fs::read(path)?) + } - // Find the next valid action to execute - loop { - match self.scanner_actions_queue.pop_front() { - Some(PosixScannerAction::Read(path)) => { - let next_action = self.initiate_file_insertion(&path); - match next_action { - Ok(next_action) => return Ok(Some(next_action)), - Err(e) => { - // If there was a planned action to add a file, but - // it no longer exists, proceed to the next planned action - if e.kind() == std::io::ErrorKind::NotFound { - continue; - } - return Err(ReadError::Io(e)); - } - }; - } - Some(PosixScannerAction::Delete(path)) => { - return Ok(Some(self.initiate_file_deletion(&path))) - } - None => { - if self.streaming_mode.is_polling_enabled() { - sleep(Self::sleep_duration()); - self.enqueue_next_actions()?; - } else { - return Ok(None); - } - } - } + fn next_scanner_actions( + &mut self, + are_deletions_enabled: bool, + cached_object_storage: &CachedObjectStorage, + ) -> Result, ReadError> { + let mut result = Vec::new(); + if are_deletions_enabled { + result.append(&mut Self::new_deletion_and_replacement_actions( + cached_object_storage, + )); } + result.append(&mut self.new_insertion_actions(cached_object_storage)?); + Ok(result) } } impl FilesystemScanner { - pub fn new( - path: &str, - persistent_id: Option, - streaming_mode: ConnectorMode, - object_pattern: &str, - ) -> Result { + pub fn new(path: &str, object_pattern: &str) -> Result { let path_glob = GlobPattern::new(path)?; - - let (cache_directory_path, connector_tmp_storage) = { - if streaming_mode.are_deletions_enabled() { - if let Ok(root_dir_str_path) = env::var("PATHWAY_PERSISTENT_STORAGE") { - let root_dir_path = Path::new(&root_dir_str_path); - ensure_directory(root_dir_path)?; - let unique_id = - persistent_id.unwrap_or_else(|| rand::thread_rng().gen::()); - let connector_tmp_directory = root_dir_path.join(format!("cache-{unique_id}")); - ensure_directory(&connector_tmp_directory)?; - (Some(connector_tmp_directory), None) - } else { - let cache_tmp_storage = tempdir()?; - let connector_tmp_directory = cache_tmp_storage.path(); - ( - Some(connector_tmp_directory.to_path_buf()), - Some(cache_tmp_storage), - ) - } - } else { - (None, None) - } - }; - Ok(Self { path: path_glob, - streaming_mode, - cache_directory_path, - object_pattern: object_pattern.to_string(), - known_files: HashMap::new(), - current_action: None, - cached_modify_times: HashMap::new(), - next_file_for_insertion: None, - cached_metadata: HashMap::new(), - scanner_actions_queue: VecDeque::new(), - _connector_tmp_storage: connector_tmp_storage, }) } - fn modify_time(&mut self, entry: &Path) -> Option { - if self.streaming_mode.are_deletions_enabled() { - // If deletions are enabled, we also need to handle the case when the modification - // time of an entry changes. Hence, we can't just memorize it once. - entry.metadata().ok()?.modified().ok() - } else { - *self - .cached_modify_times - .entry(entry.to_path_buf()) - .or_insert_with(|| entry.metadata().ok()?.modified().ok()) - } - } - - fn enqueue_next_actions(&mut self) -> Result<(), ReadError> { - if self.streaming_mode.are_deletions_enabled() { - self.enqueue_deletion_entries(); - } - self.enqueue_insertion_entries() - } - - fn enqueue_deletion_entries(&mut self) { - let mut paths_for_deletion = Vec::new(); - for (path, modified_at) in &self.known_files { - let metadata = std::fs::metadata(path); - let needs_deletion = { - match metadata { - Err(e) => e.kind() == std::io::ErrorKind::NotFound, - Ok(metadata) => { - if let Ok(new_modification_time) = metadata.modified() { - let modified_at_new = new_modification_time - .duration_since(SystemTime::UNIX_EPOCH) - .expect("System time should be after the Unix epoch") - .as_secs(); - modified_at_new != *modified_at - } else { - false - } + fn new_deletion_and_replacement_actions( + cached_object_storage: &CachedObjectStorage, + ) -> Vec { + let mut result = Vec::new(); + for (encoded_path, stored_metadata) in cached_object_storage.get_iter() { + let path: PathBuf = OsStr::from_bytes(encoded_path).into(); + match std::fs::metadata(&path) { + Err(e) => { + let is_deleted = e.kind() == std::io::ErrorKind::NotFound; + if is_deleted { + result.push(QueuedAction::Delete(encoded_path.clone())); + } + } + Ok(metadata) => { + let actual_metadata = SourceMetadata::from_fs_meta(&path, &metadata); + let is_updated = stored_metadata.is_changed(&actual_metadata); + if is_updated { + result.push(QueuedAction::Update(encoded_path.clone(), actual_metadata)); } } }; - if needs_deletion { - paths_for_deletion.push(path.clone()); - } - } - paths_for_deletion.sort_unstable(); - for path in paths_for_deletion { - self.scanner_actions_queue - .push_back(PosixScannerAction::Delete(Arc::new(path.clone()))); } + result } - fn cached_file_path(&self, path: &Path) -> Option { - self.cache_directory_path.as_ref().map(|root_path| { - let mut hasher = Hasher::default(); - hasher.update(path.as_os_str().as_bytes()); - root_path.join(format!("{}", hasher.digest128())) - }) + fn new_insertion_actions( + &mut self, + cached_object_storage: &CachedObjectStorage, + ) -> Result, ReadError> { + let mut result = Vec::new(); + for entry in self.get_matching_file_paths()? { + let object_key = entry.as_os_str().as_bytes(); + if cached_object_storage.contains_object(object_key) { + continue; + } + let metadata = match std::fs::metadata(&entry) { + Err(_) => continue, + Ok(metadata) => SourceMetadata::from_fs_meta(&entry, &metadata), + }; + result.push(QueuedAction::Read(object_key.into(), metadata)); + } + Ok(result) } fn get_matching_file_paths(&self) -> Result, ReadError> { @@ -355,74 +136,4 @@ impl FilesystemScanner { Ok(result) } - - fn enqueue_insertion_entries(&mut self) -> Result<(), ReadError> { - let matching_files: Vec = self.get_matching_file_paths()?; - let mut new_detected_files: Vec<(SystemTime, PathBuf)> = Vec::new(); - for entry in matching_files { - if !entry.is_file() || self.known_files.contains_key(&(*entry)) { - continue; - } - let Some(modify_time) = self.modify_time(&entry) else { - continue; - }; - new_detected_files.push((modify_time, entry)); - } - - new_detected_files.sort_unstable(); - for (_, path) in new_detected_files { - self.scanner_actions_queue - .push_back(PosixScannerAction::Read(Arc::new(path.clone()))); - } - - Ok(()) - } - - fn initiate_file_insertion(&mut self, new_file_name: &PathBuf) -> io::Result { - let new_file_meta = - SourceMetadata::from_fs_meta(new_file_name, &std::fs::metadata(new_file_name)?); - let cached_path = self.cached_file_path(new_file_name); - if let Some(cached_path) = cached_path { - std::fs::copy(new_file_name, cached_path)?; - } - - // The file has been successfully saved at this point. - // Now we can change the internal state. - - self.cached_metadata - .insert(new_file_name.clone(), Some(new_file_meta.clone())); - self.known_files.insert( - new_file_name.clone(), - new_file_meta - .modified_at - .unwrap_or(current_unix_timestamp_secs()), - ); - - self.current_action = Some(PosixScannerAction::Read(Arc::new(new_file_name.clone()))); - Ok(ReadResult::NewSource(Some(new_file_meta))) - } - - fn initiate_file_deletion(&mut self, path: &PathBuf) -> ReadResult { - // Metadata of the deleted file must be the same as when it was added - // so that the deletion event is processed correctly by timely. To achieve - // this, we just take the cached metadata - let old_metadata = self - .cached_metadata - .remove(path) - .expect("inconsistency between known_files and cached_metadata"); - - self.known_files.remove(&path.clone().clone()); - self.current_action = Some(PosixScannerAction::Delete(Arc::new(path.clone()))); - if path.exists() { - // If the path exists it means file modification. In this scenatio, the file - // needs first to be deleted and then to be inserted again. - self.next_file_for_insertion = Some(path.clone()); - } - - ReadResult::NewSource(old_metadata) - } - - fn sleep_duration() -> Duration { - Duration::from_millis(500) - } } diff --git a/src/connectors/scanner/mod.rs b/src/connectors/scanner/mod.rs index a465dac1..95b1f183 100644 --- a/src/connectors/scanner/mod.rs +++ b/src/connectors/scanner/mod.rs @@ -1,8 +1,6 @@ -use std::io::Read; -use std::sync::Arc; - -use crate::connectors::DataEventType; -use crate::connectors::{ReadError, ReadResult}; +use crate::connectors::metadata::SourceMetadata; +use crate::connectors::ReadError; +use crate::persistence::cached_object_storage::CachedObjectStorage; pub mod filesystem; pub mod s3; @@ -13,32 +11,28 @@ pub use filesystem::FilesystemScanner; #[allow(clippy::module_name_repetitions)] pub use s3::S3Scanner; +#[derive(Clone, Debug)] +pub enum QueuedAction { + Read(Vec, SourceMetadata), + Update(Vec, SourceMetadata), + Delete(Vec), +} + +impl QueuedAction { + pub fn path(&self) -> &[u8] { + match self { + Self::Read(path, _) | Self::Update(path, _) | Self::Delete(path) => path, + } + } +} + #[allow(clippy::module_name_repetitions)] pub trait PosixLikeScanner: Send { - /// Returns the current events type: whether it is data to be added - /// or data to be removed. - fn data_event_type(&self) -> Option; - - /// Returns a reader for the currently selected object if there - /// is an object selected for reading. - fn current_object_reader( + fn object_metadata(&mut self, object_path: &[u8]) -> Result, ReadError>; + fn read_object(&mut self, object_path: &[u8]) -> Result, ReadError>; + fn next_scanner_actions( &mut self, - ) -> Result>, ReadError>; - - /// Returns the name of the currently processed file in the input directory. It will - /// be used in the offset that will be produced by a reader. - fn current_offset_object(&self) -> Option>; - - /// Performs a seek on a directory-like structure to resume reading after the - /// object with a given name. - fn seek_to_object(&mut self, seek_object_path: &[u8]) -> Result<(), ReadError>; - - /// Finish reading the current file and find the next one to read from. - /// If there is a file to read from, the method returns a `ReadResult` - /// specifying the action to be provided downstream. - /// - /// It can either be a `NewSource` event when the new action is found or - /// a `FinishedSource` event when we've had an ongoing action and it needs - /// to be finalized. - fn next_scanner_action(&mut self) -> Result, ReadError>; + are_deletions_enabled: bool, + cached_object_storage: &CachedObjectStorage, + ) -> Result, ReadError>; } diff --git a/src/connectors/scanner/s3.rs b/src/connectors/scanner/s3.rs index 5b05d4f8..6809e27e 100644 --- a/src/connectors/scanner/s3.rs +++ b/src/connectors/scanner/s3.rs @@ -1,33 +1,48 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::collections::VecDeque; use std::fmt::Debug; -use std::io::Cursor; -use std::io::Read; -use std::mem::take; use std::str::from_utf8; -use std::sync::Arc; -use std::thread::sleep; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use arcstr::ArcStr; -use chrono::DateTime; -use log::{error, info}; +use log::{info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use rayon::{ThreadPool, ThreadPoolBuilder}; -use crate::connectors::data_storage::ConnectorMode; -use crate::connectors::scanner::PosixLikeScanner; -use crate::connectors::{DataEventType, ReadError, ReadResult}; -use crate::deepcopy::DeepCopy; +use crate::connectors::metadata::SourceMetadata; +use crate::connectors::scanner::{PosixLikeScanner, QueuedAction}; +use crate::connectors::ReadError; +use crate::persistence::cached_object_storage::CachedObjectStorage; use crate::retry::{execute_with_retries, RetryConfig}; use s3::bucket::Bucket as S3Bucket; use s3::request::request_trait::ResponseData as S3ResponseData; +use s3::serde_types::ListBucketResult as S3ListBucketResult; const MAX_S3_RETRIES: usize = 2; const S3_PATH_PREFIXES: [&str; 2] = ["s3://", "s3a://"]; -type S3DownloadedObject = (ArcStr, Cursor>); + +struct S3DownloadedObject { + path: ArcStr, + contents: Vec, + metadata: Option, +} + +impl S3DownloadedObject { + fn new(path: ArcStr, contents: Vec, metadata: Option) -> Self { + Self { + path, + contents, + metadata, + } + } + + fn set_metadata(mut self, metadata: SourceMetadata) -> Self { + self.metadata = Some(metadata); + self + } +} + type S3DownloadResult = Result; #[derive(Debug)] @@ -50,137 +65,106 @@ pub struct S3Scanner { */ bucket: S3Bucket, objects_prefix: String, - streaming_mode: ConnectorMode, - current_object_path: Option, - processed_objects: HashSet, - objects_for_processing: VecDeque, + pending_modifications: HashMap>, downloader_pool: ThreadPool, - predownloaded_objects: HashMap>>, - is_queue_initialized: bool, } impl PosixLikeScanner for S3Scanner { - fn data_event_type(&self) -> Option { - if self.current_object_path.is_some() { - Some(DataEventType::Insert) - } else { - None + fn object_metadata(&mut self, object_path: &[u8]) -> Result, ReadError> { + let path = from_utf8(object_path).expect("S3 path are expected to be UTF-8 strings"); + let object_lists = execute_with_retries( + || self.bucket.list(path.to_string(), None), + RetryConfig::default(), + MAX_S3_RETRIES, + ) + .map_err(|e| ReadError::S3(S3CommandName::ListObjectsV2, e))?; + for list in object_lists { + for object in &list.contents { + if object.key != path { + continue; + } + let metadata = SourceMetadata::from_s3_object(object); + if metadata.modified_at.is_some() { + return Ok(Some(metadata)); + } + } } + Ok(None) } - fn current_object_reader( - &mut self, - ) -> Result>, ReadError> { - let Some(path) = self.current_object_path.clone() else { - return Ok(None); - }; - let selected_object_path = from_utf8(path.as_bytes()) - .unwrap_or_else(|_| panic!("s3 path is not UTF-8 serializable: {path:?}")) - .to_string(); - if let Some(prepared_object) = self.predownloaded_objects.remove(&selected_object_path) { - let reader = Box::new(prepared_object); - Ok(Some(reader)) + fn read_object(&mut self, object_path: &[u8]) -> Result, ReadError> { + let path = from_utf8(object_path).expect("S3 path are expected to be UTF-8 strings"); + if let Some(prepared_object) = self.pending_modifications.remove(path) { + Ok(prepared_object) } else { - // Expected to happen once for each case of persistent recovery. - // If repeats multiple times, something is wrong. - info!("No prepared object for {selected_object_path}. Downloading directly from S3."); - let (_, object_contents) = - Self::stream_object_from_path_and_bucket(&selected_object_path, &self.bucket)?; - let reader = Box::new(object_contents); - Ok(Some(reader)) + let downloaded_object = Self::stream_object_from_path_and_bucket(path, &self.bucket)?; + Ok(downloaded_object.contents) } } - fn current_offset_object(&self) -> Option> { - self.current_object_path - .clone() - .map(|x| x.as_bytes().into()) - } - - fn seek_to_object(&mut self, path: &[u8]) -> Result<(), ReadError> { - let path: String = from_utf8(path) - .unwrap_or_else(|_| panic!("s3 path is not UTF-8 serializable: {path:?}")) - .to_string(); - self.processed_objects.clear(); - - /* - S3 bucket-list calls are considered expensive, because of that we do one. - Then, two linear passes detect the objects which should be marked. - */ - let object_lists = execute_with_retries( + fn next_scanner_actions( + &mut self, + are_deletions_enabled: bool, + cached_object_storage: &CachedObjectStorage, + ) -> Result, ReadError> { + let object_lists: Vec = execute_with_retries( || self.bucket.list(self.objects_prefix.to_string(), None), RetryConfig::default(), MAX_S3_RETRIES, ) .map_err(|e| ReadError::S3(S3CommandName::ListObjectsV2, e))?; - - let mut threshold_modification_time = None; - for list in &object_lists { + let mut result = Vec::new(); + let mut seen_object_keys = HashSet::new(); + let mut pending_modification_download_tasks = Vec::new(); + for list in object_lists { for object in &list.contents { - if object.key == path { - let Ok(last_modified) = DateTime::parse_from_rfc3339(&object.last_modified) - else { - continue; - }; - threshold_modification_time = Some(last_modified); + seen_object_keys.insert(object.key.clone()); + let actual_metadata = SourceMetadata::from_s3_object(object); + let object_key = object.key.as_bytes(); + if let Some(stored_metadata) = cached_object_storage.stored_metadata(object_key) { + let is_updated = stored_metadata.is_changed(&actual_metadata); + if is_updated && are_deletions_enabled { + pending_modification_download_tasks.push(actual_metadata); + } + } else { + pending_modification_download_tasks.push(actual_metadata); } } } - if let Some(threshold_modification_time) = threshold_modification_time { - let path = path.to_string(); - for list in object_lists { - for object in list.contents { - let Ok(last_modified) = DateTime::parse_from_rfc3339(&object.last_modified) - else { - continue; - }; - if (last_modified, &object.key) < (threshold_modification_time, &path) { - self.processed_objects.insert(object.key); + let pending_modification_objects = self.download_bulk(&pending_modification_download_tasks); + for object in pending_modification_objects { + match object { + Ok(object) => { + let object_path = object.path.to_string(); + let is_update = cached_object_storage.contains_object(object_path.as_bytes()); + if is_update { + result.push(QueuedAction::Update( + object_path.as_bytes().into(), + object.metadata.as_ref().unwrap().clone(), + )); + } else { + result.push(QueuedAction::Read( + object_path.as_bytes().into(), + object.metadata.as_ref().unwrap().clone(), + )); } + self.pending_modifications + .insert(object_path.clone(), object.contents); + } + Err(e) => { + warn!("Failed to fetch the modified version of the object: {e}. It will be retried with the next bulk of updates."); } } - self.processed_objects.insert(path.clone()); - self.current_object_path = Some(path.into()); } - - Ok(()) - } - - fn next_scanner_action(&mut self) -> Result, ReadError> { - if take(&mut self.current_object_path).is_some() { - return Ok(Some(ReadResult::FinishedSource { - commit_allowed: true, - })); - } - - let is_polling_enabled = self.streaming_mode.is_polling_enabled(); - loop { - if let Some(selected_object_path) = self.objects_for_processing.pop_front() { - if self - .predownloaded_objects - .contains_key(&selected_object_path) - { - self.processed_objects.insert(selected_object_path.clone()); - self.current_object_path = Some(selected_object_path.into()); - return Ok(Some(ReadResult::NewSource(None))); - } - } else { - let is_queue_refresh_needed = !self.is_queue_initialized - || (self.objects_for_processing.is_empty() && is_polling_enabled); - if is_queue_refresh_needed { - self.find_new_objects_for_processing()?; - if self.objects_for_processing.is_empty() && is_polling_enabled { - // Even after the queue refresh attempt, it's still empty - // Sleep before the next poll - sleep(Self::sleep_duration()); - } - } else { - // No elements and no further queue refreshes, - // the connector can stop at this point - return Ok(None); + if are_deletions_enabled { + for (object_path, _) in cached_object_storage.get_iter() { + let object_path = from_utf8(object_path).expect("S3 paths must be UTF8-compatible"); + if !seen_object_keys.contains(object_path) { + result.push(QueuedAction::Delete(object_path.as_bytes().into())); } } } + Ok(result) } } @@ -189,7 +173,6 @@ impl S3Scanner { pub fn new( bucket: S3Bucket, objects_prefix: impl Into, - streaming_mode: ConnectorMode, downloader_threads_count: usize, ) -> Result { let objects_prefix = objects_prefix.into(); @@ -215,17 +198,11 @@ impl S3Scanner { Ok(S3Scanner { bucket, objects_prefix, - streaming_mode, - - current_object_path: None, - processed_objects: HashSet::new(), - objects_for_processing: VecDeque::new(), downloader_pool: ThreadPoolBuilder::new() .num_threads(downloader_threads_count) .build() - .expect("Failed to create downloader pool"), // TODO: configure number of threads - predownloaded_objects: HashMap::new(), - is_queue_initialized: false, + .expect("Failed to create downloader pool"), + pending_modifications: HashMap::new(), }) } @@ -256,88 +233,36 @@ impl S3Scanner { .map_err(|e| ReadError::S3(S3CommandName::GetObject, e)) } - pub fn stream_object_from_path_and_bucket( + fn stream_object_from_path_and_bucket( object_path_ref: &str, bucket: &S3Bucket, ) -> S3DownloadResult { let object_path = object_path_ref.to_string(); let response = Self::download_object_from_path_and_bucket(&object_path, bucket)?; - let readable_data = Cursor::new(response.bytes().to_vec()); - Ok((object_path_ref.to_string().into(), readable_data)) - } - pub fn stream_object_from_path( - &mut self, - object_path_ref: &str, - ) -> Result>, ReadError> { - let (current_object_path, reader_impl) = - Self::stream_object_from_path_and_bucket(object_path_ref, &self.bucket.deep_copy())?; - self.current_object_path = Some(current_object_path); - Ok(reader_impl) + Ok(S3DownloadedObject::new( + object_path_ref.to_string().into(), + response.bytes().to_vec(), + None, + )) } - fn find_new_objects_for_processing(&mut self) -> Result<(), ReadError> { - let object_lists = execute_with_retries( - || self.bucket.list(self.objects_prefix.to_string(), None), - RetryConfig::default(), - MAX_S3_RETRIES, - ) - .map_err(|e| ReadError::S3(S3CommandName::ListObjectsV2, e))?; - - let mut new_objects = Vec::new(); - for list in &object_lists { - for object in &list.contents { - if self.processed_objects.contains(&object.key) { - continue; - } - let Ok(last_modified) = DateTime::parse_from_rfc3339(&object.last_modified) else { - continue; - }; - new_objects.push((last_modified, object.key.clone())); - } + fn download_bulk(&mut self, new_objects: &[SourceMetadata]) -> Vec { + if new_objects.is_empty() { + return Vec::with_capacity(0); } - new_objects.sort_unstable(); - info!("Found {} new objects to process", new_objects.len()); + info!("Downloading a bulk of {} objects", new_objects.len()); let downloading_started_at = SystemTime::now(); let new_objects_downloaded: Vec = self.downloader_pool.install(|| { new_objects .par_iter() - .map(|(_, path)| Self::stream_object_from_path_and_bucket(path, &self.bucket)) + .map(|task| { + Self::stream_object_from_path_and_bucket(&task.path, &self.bucket) + .map(|result| result.set_metadata(task.clone())) + }) .collect() }); info!("Downloading done in {:?}", downloading_started_at.elapsed()); - - for downloaded_object in new_objects_downloaded { - match downloaded_object { - Ok((path, prepared_reader)) => { - self.predownloaded_objects - .insert(path.to_string(), prepared_reader); - } - Err(e) => { - error!("Error while downloading an object from S3: {e}"); - } - } - } - for (_, object_key) in new_objects { - self.objects_for_processing.push_back(object_key); - } - - info!( - "The {} new objects have been enqueued for further processing", - self.objects_for_processing.len() - ); - self.is_queue_initialized = true; - Ok(()) - } - - pub fn expect_current_object_path(&self) -> ArcStr { - self.current_object_path - .as_ref() - .expect("current object should be present") - .clone() - } - - fn sleep_duration() -> Duration { - Duration::from_millis(10000) + new_objects_downloaded } } diff --git a/src/engine/dataflow/persist.rs b/src/engine/dataflow/persist.rs index b9ad5a6b..daed041e 100644 --- a/src/engine/dataflow/persist.rs +++ b/src/engine/dataflow/persist.rs @@ -315,7 +315,7 @@ where fn read_persisted_state( name: &str, mut scope: S, - reader: Box + Send>, + mut reader: Box + Send>, ) -> (Collection, Poller, thread::JoinHandle<()>) where S: Scope, diff --git a/src/fs_helpers.rs b/src/fs_helpers.rs index 64caf26e..d2cb8e04 100644 --- a/src/fs_helpers.rs +++ b/src/fs_helpers.rs @@ -5,7 +5,7 @@ use std::path::Path; pub fn ensure_directory(fs_path: &Path) -> Result<(), Error> { if !fs_path.exists() { - if let Err(e) = std::fs::create_dir(fs_path) { + if let Err(e) = std::fs::create_dir_all(fs_path) { if e.kind() == ErrorKind::AlreadyExists { return Ok(()); } diff --git a/src/persistence/backends/file.rs b/src/persistence/backends/file.rs index 8583be31..0d735114 100644 --- a/src/persistence/backends/file.rs +++ b/src/persistence/backends/file.rs @@ -86,16 +86,13 @@ impl PersistenceBackend for FilesystemKVStorage { .join(key.to_owned() + TEMPORARY_OBJECT_SUFFIX); let final_path = self.root_path.join(key); let put_value_result = Self::write_file(&tmp_path, &final_path, &value); - let send_result = sender.send(put_value_result); - if let Err(unsent_flush_result) = send_result { - error!( - "The receiver no longer waits for the result of this save: {unsent_flush_result:?}" - ); - } + sender + .send(put_value_result) + .expect("The receiver must still be listening for the result of the put_value"); receiver } - fn remove_key(&self, key: &str) -> Result<(), Error> { + fn remove_key(&mut self, key: &str) -> Result<(), Error> { std::fs::remove_file(self.root_path.join(key))?; Ok(()) } diff --git a/src/persistence/backends/memory.rs b/src/persistence/backends/memory.rs new file mode 100644 index 00000000..d7072096 --- /dev/null +++ b/src/persistence/backends/memory.rs @@ -0,0 +1,69 @@ +// Copyright © 2024 Pathway + +use log::{error, warn}; +use std::collections::HashMap; + +use futures::channel::oneshot; +use futures::channel::oneshot::Receiver as OneShotReceiver; + +use crate::persistence::backends::PersistenceBackend; +use crate::persistence::Error; + +// Memory KV-storage. Must not be available for the user. +// Used as a backend for `CachedObjectStorage` when persistence is not enabled. +#[derive(Debug, Default)] +#[allow(clippy::module_name_repetitions)] +pub struct MemoryKVStorage { + values: HashMap>, +} + +#[derive(Debug, thiserror::Error)] +#[allow(clippy::module_name_repetitions)] +pub enum MemoryKVStorageError { + #[error("key not found: {0}")] + NoSuchKey(String), +} + +impl MemoryKVStorage { + pub fn new() -> Self { + Self { + values: HashMap::new(), + } + } +} + +impl PersistenceBackend for MemoryKVStorage { + fn list_keys(&self) -> Result, Error> { + let keys = self.values.keys(); + let keys_owned: Vec = keys.cloned().collect(); + Ok(keys_owned) + } + + fn get_value(&self, key: &str) -> Result, Error> { + let value = self.values.get(key); + if let Some(value) = value { + Ok(value.clone()) + } else { + let storage_error = MemoryKVStorageError::NoSuchKey(key.into()); + Err(Error::Memory(storage_error)) + } + } + + fn put_value(&mut self, key: &str, value: Vec) -> OneShotReceiver> { + let (sender, receiver) = oneshot::channel(); + sender + .send(Ok(())) + .expect("The receiver must still be listening for the result of the put_value"); + self.values.insert(key.to_string(), value); + receiver + } + + fn remove_key(&mut self, key: &str) -> Result<(), Error> { + if self.values.remove(key).is_none() { + warn!( + "The key '{key}' was requested to be removed, but it's not available in the cache." + ); + } + Ok(()) + } +} diff --git a/src/persistence/backends/mock.rs b/src/persistence/backends/mock.rs index fc7e0be8..1311b2d7 100644 --- a/src/persistence/backends/mock.rs +++ b/src/persistence/backends/mock.rs @@ -1,7 +1,5 @@ // Copyright © 2024 Pathway -use log::error; - use futures::channel::oneshot; use crate::persistence::backends::PersistenceBackend; @@ -24,16 +22,13 @@ impl PersistenceBackend for MockKVStorage { fn put_value(&mut self, _key: &str, _value: Vec) -> BackendPutFuture { let (sender, receiver) = oneshot::channel(); - let send_result = sender.send(Ok(())); - if let Err(unsent_flush_result) = send_result { - error!( - "The receiver no longer waits for the result of this save: {unsent_flush_result:?}" - ); - } + sender + .send(Ok(())) + .expect("The receiver must still be listening for the result of the put_value"); receiver } - fn remove_key(&self, _key: &str) -> Result<(), Error> { + fn remove_key(&mut self, _key: &str) -> Result<(), Error> { Ok(()) } } diff --git a/src/persistence/backends/mod.rs b/src/persistence/backends/mod.rs index 9e1dc3b4..9a4ec11e 100644 --- a/src/persistence/backends/mod.rs +++ b/src/persistence/backends/mod.rs @@ -10,10 +10,12 @@ use futures::channel::oneshot::Receiver as OneShotReceiver; use serde_json::Error as JsonParseError; pub use file::FilesystemKVStorage; +pub use memory::{MemoryKVStorage, MemoryKVStorageError}; pub use mock::MockKVStorage; pub use s3::S3KVStorage; pub mod file; +pub mod memory; pub mod mock; pub mod s3; @@ -26,6 +28,9 @@ pub enum Error { #[error(transparent)] S3(#[from] S3Error), + #[error(transparent)] + Memory(#[from] MemoryKVStorageError), + #[error(transparent)] Utf8(#[from] Utf8Error), @@ -50,5 +55,5 @@ pub trait PersistenceBackend: Send + Debug { fn put_value(&mut self, key: &str, value: Vec) -> BackendPutFuture; /// Remove the value corresponding to the `key`. - fn remove_key(&self, key: &str) -> Result<(), Error>; + fn remove_key(&mut self, key: &str) -> Result<(), Error>; } diff --git a/src/persistence/backends/s3.rs b/src/persistence/backends/s3.rs index 3ef87426..73ffbff3 100644 --- a/src/persistence/backends/s3.rs +++ b/src/persistence/backends/s3.rs @@ -142,7 +142,7 @@ impl PersistenceBackend for S3KVStorage { receiver } - fn remove_key(&self, key: &str) -> Result<(), Error> { + fn remove_key(&mut self, key: &str) -> Result<(), Error> { let full_key_path = self.full_key_path(key); let _ = self.bucket.delete_object(full_key_path)?; Ok(()) diff --git a/src/persistence/cached_object_storage.rs b/src/persistence/cached_object_storage.rs new file mode 100644 index 00000000..cd36292f --- /dev/null +++ b/src/persistence/cached_object_storage.rs @@ -0,0 +1,153 @@ +use std::collections::hash_map::Iter; +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use xxhash_rust::xxh3::Xxh3 as Hasher; + +use crate::connectors::metadata::SourceMetadata; +use crate::persistence::backends::{Error as PersistenceError, PersistenceBackend}; + +const BLOB_EXTENSION: &str = ".blob"; +const METADATA_EXTENSION: &str = ".metadata"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MetadataEntry { + uri: Vec, + metadata: SourceMetadata, +} + +impl MetadataEntry { + pub fn new(uri: Vec, metadata: SourceMetadata) -> Self { + Self { uri, metadata } + } + + pub fn parse(bytes: &[u8]) -> Result { + let data = std::str::from_utf8(bytes)?; + let result = serde_json::from_str::(data.trim_end()) + .map_err(|e| PersistenceError::IncorrectMetadataFormat(data.to_string(), e))?; + Ok(result) + } + + pub fn serialize(&self) -> String { + serde_json::to_string(&self).unwrap() + } +} + +#[derive(Debug)] +pub struct CachedObjectStorage { + backend: Box, + known_objects: HashMap, SourceMetadata>, +} + +impl CachedObjectStorage { + pub fn new(backend: Box) -> Result { + let mut known_objects = HashMap::new(); + let keys = backend.list_keys()?; + + // If slow, use thread pool + for key in keys { + if !key.ends_with(METADATA_EXTENSION) { + continue; + } + let object = backend.get_value(&key)?; + let entry = MetadataEntry::parse(&object)?; + known_objects.insert(entry.uri, entry.metadata); + } + + Ok(Self { + backend, + known_objects, + }) + } + + pub fn clear(&mut self) -> Result<(), PersistenceError> { + let keys: Vec> = self.known_objects.keys().cloned().collect(); + for uri in keys { + self.remove_object_from_storage(&uri)?; + } + self.known_objects.clear(); + Ok(()) + } + + pub fn place_object( + &mut self, + uri: &[u8], + contents: Vec, + metadata: SourceMetadata, + ) -> Result<(), PersistenceError> { + let object_key = Self::cached_object_path(uri); + let metadata_key = Self::metadata_path(uri); + + futures::executor::block_on(async { + self.backend.put_value(&object_key, contents).await.unwrap() + })?; + + // The binary object and the metadata must be written one by one. + // This ensures that if there is a metadata object for a certain key, + // it is guaranteed to be fully loaded into the storage. + // On the contrary, if two futures are waited for simultaneously, + // it is possible that the binary object uploading future fails, + // while the metadata uploading future succeeds. This would lead to a situation where, + // upon recovery, a certain object is considered present in the storage + // but causes an error during an actual acquisition attempt. + let metadata_entry = MetadataEntry::new(uri.to_vec(), metadata.clone()); + futures::executor::block_on(async { + let serialized_entry = metadata_entry.serialize(); + self.backend + .put_value(&metadata_key, serialized_entry.as_bytes().to_vec()) + .await + .unwrap() + })?; + self.known_objects.insert(uri.to_vec(), metadata); + + Ok(()) + } + + pub fn remove_object(&mut self, uri: &[u8]) -> Result<(), PersistenceError> { + self.remove_object_from_storage(uri)?; + self.known_objects.remove(uri); + Ok(()) + } + + pub fn contains_object(&self, uri: &[u8]) -> bool { + self.known_objects.contains_key(uri) + } + + pub fn get_iter(&self) -> Iter, SourceMetadata> { + self.known_objects.iter() + } + + pub fn stored_metadata(&self, uri: &[u8]) -> Option<&SourceMetadata> { + self.known_objects.get(uri) + } + + pub fn get_object(&self, uri: &[u8]) -> Result, PersistenceError> { + let object_key = Self::cached_object_path(uri); + self.backend.get_value(&object_key) + } + + // Below are helper methods + + fn remove_object_from_storage(&mut self, uri: &[u8]) -> Result<(), PersistenceError> { + let object_key = Self::cached_object_path(uri); + let metadata_key = Self::metadata_path(uri); + self.backend.remove_key(&metadata_key)?; + self.backend.remove_key(&object_key) + } + + fn uri_hash(uri: &[u8]) -> String { + let mut hasher = Hasher::default(); + hasher.update(uri); + format!("{}", hasher.digest128()) + } + + fn cached_object_path(uri: &[u8]) -> String { + let uri_hash = Self::uri_hash(uri); + format!("{uri_hash}{BLOB_EXTENSION}") + } + + fn metadata_path(uri: &[u8]) -> String { + let uri_hash = Self::uri_hash(uri); + format!("{uri_hash}{METADATA_EXTENSION}") + } +} diff --git a/src/persistence/config.rs b/src/persistence/config.rs index b6dbb41c..e63ff025 100644 --- a/src/persistence/config.rs +++ b/src/persistence/config.rs @@ -26,6 +26,7 @@ use crate::fs_helpers::ensure_directory; use crate::persistence::backends::{ FilesystemKVStorage, MockKVStorage, PersistenceBackend, S3KVStorage, }; +use crate::persistence::cached_object_storage::CachedObjectStorage; use crate::persistence::operator_snapshot::{ConcreteSnapshotReader, MultiConcreteSnapshotReader}; use crate::persistence::state::MetadataAccessor; use crate::persistence::Error as PersistenceBackendError; @@ -162,6 +163,29 @@ impl PersistenceManagerConfig { } } + pub fn create_cached_object_storage( + &self, + persistent_id: PersistentId, + ) -> Result { + let backend: Box = match &self.backend { + PersistentStorageConfig::Filesystem(root_path) => { + let storage_root_path = + root_path.join(format!("cached-objects-storage/{persistent_id}")); + ensure_directory(&storage_root_path)?; + Box::new(FilesystemKVStorage::new(&storage_root_path)?) + } + PersistentStorageConfig::S3 { bucket, root_path } => { + let storage_root_path = format!( + "{}/cached-objects-storage/{persistent_id}", + root_path.strip_suffix('/').unwrap_or(root_path), + ); + Box::new(S3KVStorage::new(bucket.deep_copy(), &storage_root_path)) + } + PersistentStorageConfig::Mock(_) => Box::new(MockKVStorage {}), + }; + CachedObjectStorage::new(backend) + } + pub fn create_metadata_storage(&self) -> Result { let backend = self.backend.create()?; MetadataAccessor::new(backend, self.worker_id, self.total_workers) @@ -269,9 +293,7 @@ impl PersistenceManagerConfig { root_path: &Path, persistent_id: PersistentId, ) -> Result { - ensure_directory(root_path)?; let streams_path = root_path.join(STREAMS_DIRECTORY_NAME); - ensure_directory(&streams_path)?; let worker_path = streams_path.join(self.worker_id.to_string()); ensure_directory(&worker_path)?; Ok(worker_path.join(persistent_id.to_string())) @@ -292,8 +314,6 @@ impl PersistenceManagerConfig { persistent_id: PersistentId, query_purpose: ReadersQueryPurpose, ) -> Result, PersistenceBackendError> { - ensure_directory(root_path)?; - let streams_dir = root_path.join("streams"); ensure_directory(&streams_dir)?; diff --git a/src/persistence/frontier.rs b/src/persistence/frontier.rs index 9609890b..66135dd8 100644 --- a/src/persistence/frontier.rs +++ b/src/persistence/frontier.rs @@ -40,6 +40,10 @@ impl OffsetAntichain { pub fn iter(&self) -> hash_map::Iter<'_, OffsetKey, OffsetValue> { self.antichain.iter() } + + pub fn is_empty(&self) -> bool { + self.antichain.is_empty() + } } impl<'a> IntoIterator for &'a OffsetAntichain { diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index 78cf182b..0923a842 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -11,6 +11,7 @@ use xxhash_rust::xxh3::Xxh3 as Hasher; use crate::engine::Timestamp; pub mod backends; +pub mod cached_object_storage; pub mod config; pub mod frontier; pub mod input_snapshot; diff --git a/src/persistence/operator_snapshot.rs b/src/persistence/operator_snapshot.rs index a67c185b..e2d20b4b 100644 --- a/src/persistence/operator_snapshot.rs +++ b/src/persistence/operator_snapshot.rs @@ -16,7 +16,7 @@ use crate::persistence::PersistenceTime; #[allow(clippy::module_name_repetitions)] pub trait OperatorSnapshotReader { - fn load_persisted(&self) -> Result, BackendError>; + fn load_persisted(&mut self) -> Result, BackendError>; } #[allow(clippy::module_name_repetitions)] @@ -149,7 +149,7 @@ where D: ExchangeData, R: ExchangeData, { - fn load_persisted(&self) -> Result, BackendError> { + fn load_persisted(&mut self) -> Result, BackendError> { let keys = self.backend.list_keys()?; let chunks = get_chunks(keys, self.threshold_time); for chunk in itertools::chain(chunks.too_old.iter(), chunks.too_new.iter()) { @@ -185,9 +185,9 @@ where D: ExchangeData, R: ExchangeData + Semigroup, { - fn load_persisted(&self) -> Result, BackendError> { + fn load_persisted(&mut self) -> Result, BackendError> { let mut result = Vec::new(); - for snapshot_reader in &self.snapshot_readers { + for snapshot_reader in &mut self.snapshot_readers { let mut v = snapshot_reader.load_persisted()?; if v.len() > result.len() { swap(&mut result, &mut v); diff --git a/src/persistence/state.rs b/src/persistence/state.rs index 654793d8..943c3965 100644 --- a/src/persistence/state.rs +++ b/src/persistence/state.rs @@ -159,7 +159,7 @@ impl VersionInformation { impl MetadataAccessor { pub fn new( - backend: Box, + mut backend: Box, worker_id: usize, total_workers: usize, ) -> Result { diff --git a/src/persistence/tracker.rs b/src/persistence/tracker.rs index fd3425da..0f4f252e 100644 --- a/src/persistence/tracker.rs +++ b/src/persistence/tracker.rs @@ -10,6 +10,7 @@ use std::sync::{Arc, Mutex}; use crate::connectors::PersistenceMode; use crate::engine::{Timestamp, TotalFrontier}; use crate::persistence::backends::BackendPutFuture as PersistenceBackendFlushFuture; +use crate::persistence::cached_object_storage::CachedObjectStorage; use crate::persistence::config::{PersistenceManagerConfig, ReadersQueryPurpose}; use crate::persistence::input_snapshot::{ReadInputSnapshot, SnapshotMode}; use crate::persistence::operator_snapshot::{Flushable, OperatorSnapshotReader}; @@ -107,6 +108,13 @@ impl WorkerPersistentStorage { }) } + pub fn create_cached_object_storage( + &self, + persistent_id: PersistentId, + ) -> Result { + self.config.create_cached_object_storage(persistent_id) + } + pub fn table_persistence_enabled(&self) -> bool { matches!( self.config.persistence_mode, diff --git a/tests/integration/helpers.rs b/tests/integration/helpers.rs index a6db647e..be967426 100644 --- a/tests/integration/helpers.rs +++ b/tests/integration/helpers.rs @@ -104,12 +104,18 @@ pub fn full_cycle_read( let event = event.replace_errors(); if let Some(ref mut snapshot_writer) = snapshot_writer { let snapshot_event = match event { + // Random key generation is used only for testing purposes and + // doesn't reflect the logic used in pathway applications. ParsedEvent::Insert((_, ref values)) => { let key = Key::random(); SnapshotEvent::Insert(key, values.clone()) } - ParsedEvent::Delete((_, _)) | ParsedEvent::Upsert((_, _)) => { - todo!("delete and upsert aren't supported in this test") + ParsedEvent::Delete((_, ref values)) => { + let key = Key::random(); + SnapshotEvent::Delete(key, values.clone()) + } + ParsedEvent::Upsert((_, _)) => { + todo!("upsert aren't supported in this test") } ParsedEvent::AdvanceTime => { SnapshotEvent::AdvanceTime(Timestamp(1), frontier.clone()) diff --git a/tests/integration/test_operator_persistence.rs b/tests/integration/test_operator_persistence.rs index 702dd9f7..b4b62749 100644 --- a/tests/integration/test_operator_persistence.rs +++ b/tests/integration/test_operator_persistence.rs @@ -47,7 +47,7 @@ where D: Clone, R: Clone, { - fn load_persisted(&self) -> Result, BackendError> { + fn load_persisted(&mut self) -> Result, BackendError> { Ok(self.data.clone()) } } @@ -377,7 +377,7 @@ mock! { fn list_keys(&self) -> Result, BackendError>; fn get_value(&self, key: &str) -> Result, BackendError>; fn put_value(&mut self, key: &str, value: Vec) -> BackendPutFuture; - fn remove_key(&self, key: &str) -> Result<(), BackendError>; + fn remove_key(&mut self, key: &str) -> Result<(), BackendError>; } } @@ -417,7 +417,7 @@ fn test_operator_snapshot_reader_reads_correct_files_1() { .times(1) .returning(|_key| Ok(())); } - let reader = MultiConcreteSnapshotReader::new(vec![ConcreteSnapshotReader::new( + let mut reader = MultiConcreteSnapshotReader::new(vec![ConcreteSnapshotReader::new( Box::new(backend), TotalFrontier::At(Timestamp(34)), )]); @@ -477,7 +477,7 @@ fn test_operator_snapshot_reader_consolidates() { .times(1) .returning(|_key| Ok(())); } - let reader = MultiConcreteSnapshotReader::new(vec![ConcreteSnapshotReader::new( + let mut reader = MultiConcreteSnapshotReader::new(vec![ConcreteSnapshotReader::new( Box::new(backend), TotalFrontier::At(Timestamp(22)), )]); diff --git a/tests/integration/test_seek.rs b/tests/integration/test_seek.rs index 975aafdb..47128762 100644 --- a/tests/integration/test_seek.rs +++ b/tests/integration/test_seek.rs @@ -122,6 +122,22 @@ fn test_csv_file_recovery() -> eyre::Result<()> { assert_eq!( data_stream.new_parsed_entries, vec![ + ParsedEvent::Delete(( + Some(vec![Value::String("1".into())]), + vec![Value::String("2".into())] + )), + ParsedEvent::Delete(( + Some(vec![Value::String("a".into())]), + vec![Value::String("b".into())] + )), + ParsedEvent::Insert(( + Some(vec![Value::String("1".into())]), + vec![Value::String("2".into())] + )), + ParsedEvent::Insert(( + Some(vec![Value::String("a".into())]), + vec![Value::String("b".into())] + )), ParsedEvent::Insert(( Some(vec![Value::String("c".into())]), vec![Value::String("d".into())] @@ -183,7 +199,7 @@ fn test_csv_dir_recovery() -> eyre::Result<()> { ); } - std::fs::remove_file(inputs_dir_path.join("input1.csv")).unwrap(); + // std::fs::remove_file(inputs_dir_path.join("input1.csv")).unwrap(); std::fs::write( inputs_dir_path.join("input2.csv"), "key,value\nq,w\ne,r\nt,y\np,q", @@ -194,10 +210,36 @@ fn test_csv_dir_recovery() -> eyre::Result<()> { let data_stream = full_cycle_read_kv(TestedFormat::Csv, &inputs_dir_path, Some(&tracker))?; assert_eq!( data_stream.new_parsed_entries, - vec![ParsedEvent::Insert(( - Some(vec![Value::String("p".into())]), - vec![Value::String("q".into())] - ))] + vec![ + ParsedEvent::Delete(( + Some(vec![Value::String("q".into())]), + vec![Value::String("w".into())] + )), + ParsedEvent::Delete(( + Some(vec![Value::String("e".into())]), + vec![Value::String("r".into())] + )), + ParsedEvent::Delete(( + Some(vec![Value::String("t".into())]), + vec![Value::String("y".into())] + )), + ParsedEvent::Insert(( + Some(vec![Value::String("q".into())]), + vec![Value::String("w".into())] + )), + ParsedEvent::Insert(( + Some(vec![Value::String("e".into())]), + vec![Value::String("r".into())] + )), + ParsedEvent::Insert(( + Some(vec![Value::String("t".into())]), + vec![Value::String("y".into())] + )), + ParsedEvent::Insert(( + Some(vec![Value::String("p".into())]), + vec![Value::String("q".into())] + )) + ] ); } @@ -232,7 +274,7 @@ fn test_json_file_recovery() -> eyre::Result<()> { std::fs::write( &input_path, - r#"{"key", 1, "value": "a"} + r#"{"key": 1, "value": "a"} {"key": 2, "value": "b"} {"key": 3, "value": "c"}"#, ) @@ -242,10 +284,13 @@ fn test_json_file_recovery() -> eyre::Result<()> { let data_stream = full_cycle_read_kv(TestedFormat::Json, &input_path, Some(&tracker))?; assert_eq!( data_stream.new_parsed_entries, - vec![ParsedEvent::Insert(( - Some(vec![Value::Int(3)]), - vec![Value::String("c".into())] - )),] + vec![ + ParsedEvent::Delete((Some(vec![Value::Int(1)]), vec![Value::String("a".into())])), + ParsedEvent::Delete((Some(vec![Value::Int(2)]), vec![Value::String("b".into())])), + ParsedEvent::Insert((Some(vec![Value::Int(1)]), vec![Value::String("a".into())])), + ParsedEvent::Insert((Some(vec![Value::Int(2)]), vec![Value::String("b".into())])), + ParsedEvent::Insert((Some(vec![Value::Int(3)]), vec![Value::String("c".into())])), + ] ); } @@ -305,6 +350,10 @@ fn test_json_folder_recovery() -> eyre::Result<()> { assert_eq!( data_stream.new_parsed_entries, vec![ + ParsedEvent::Delete((Some(vec![Value::Int(3)]), vec![Value::String("c".into())])), + ParsedEvent::Delete((Some(vec![Value::Int(4)]), vec![Value::String("d".into())])), + ParsedEvent::Insert((Some(vec![Value::Int(3)]), vec![Value::String("c".into())])), + ParsedEvent::Insert((Some(vec![Value::Int(4)]), vec![Value::String("d".into())])), ParsedEvent::Insert((Some(vec![Value::Int(5)]), vec![Value::String("e".into())])), ParsedEvent::Insert((Some(vec![Value::Int(6)]), vec![Value::String("f".into())])), ParsedEvent::Insert((Some(vec![Value::Int(7)]), vec![Value::String("g".into())])), @@ -316,7 +365,7 @@ fn test_json_folder_recovery() -> eyre::Result<()> { } #[test] -fn test_json_recovery_from_empty_folder() -> eyre::Result<()> { +fn test_json_recovery_with_new_file() -> eyre::Result<()> { let test_storage = tempdir()?; let test_storage_path = test_storage.path(); @@ -350,9 +399,6 @@ fn test_json_recovery_from_empty_folder() -> eyre::Result<()> { ); } - std::fs::remove_file(inputs_dir_path.as_path().join("input1.json")).unwrap(); - std::fs::remove_file(inputs_dir_path.as_path().join("input2.json")).unwrap(); - std::fs::write( inputs_dir_path.as_path().join("input3.json"), r#"{"key": 5, "value": "e"}