Skip to content

Commit

Permalink
operator persistence - IO (#7675)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 540294dcf40e9aa75b19bf215586470396cd9497
  • Loading branch information
KamilPiechowiak authored and Manul from Pathway committed Dec 4, 2024
1 parent 7eec7c3 commit 922d5a6
Show file tree
Hide file tree
Showing 25 changed files with 2,193 additions and 215 deletions.
77 changes: 77 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ doctest = false
[dev-dependencies]
assert_matches = "1.5.0"
eyre = "0.6.12"
mockall = "0.13.1"

[dependencies]
arc-swap = "1.7.1"
Expand Down
43 changes: 38 additions & 5 deletions integration_tests/wordcount/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
STREAMING_MODE_NAME = "streaming"
FS_STORAGE_NAME = "fs"
S3_STORAGE_NAME = "s3"
INPUT_PERSISTENCE_MODE_NAME = "PERSISTING"
OPERATOR_PERSISTENCE_MODE_NAME = "OPERATOR_PERSISTING"


class PStoragePath:
Expand Down Expand Up @@ -76,6 +78,7 @@ def check_output_correctness(
latest_input_file, input_path, output_path, interrupted_run=False
):
input_word_counts = {}
old_input_word_counts = {}
new_file_lines = set()
distinct_new_words = set()

Expand All @@ -99,7 +102,11 @@ def check_output_correctness(
input_word_counts[word] = 0
input_word_counts[word] += 1

if not on_old_file:
if on_old_file:
if word not in old_input_word_counts:
old_input_word_counts[word] = 0
old_input_word_counts[word] += 1
else:
new_file_lines.add((word, input_word_counts[word]))
distinct_new_words.add(word)

Expand All @@ -113,6 +120,7 @@ def check_output_correctness(
is_first_row = True
word_column_index = None
count_column_index = None
diff_column_index = None
for row in f:
n_rows += 1
if is_first_row:
Expand All @@ -122,29 +130,44 @@ def check_output_correctness(
word_column_index = col_idx
elif col_name == "count":
count_column_index = col_idx
elif col_name == "diff":
diff_column_index = col_idx
is_first_row = False
assert (
word_column_index is not None
), "'word' is absent in CSV header"
assert (
count_column_index is not None
), "'count' is absent in CSV header"
assert (
diff_column_index is not None
), "'diff' is absent in CSV header"
continue

assert word_column_index is not None
assert count_column_index is not None
assert diff_column_index is not None
tokens = row.strip().split(",")
try:
word = tokens[word_column_index].strip('"')
count = tokens[count_column_index]
count = int(tokens[count_column_index])
diff = int(tokens[diff_column_index])
output_word_counts[word] = int(count)
except IndexError:
# line split in two chunks, one fsynced, another did not
if not interrupted_run:
raise

if (word, int(count)) not in new_file_lines:
n_old_lines += 1
if diff == 1:
if (word, count) not in new_file_lines:
n_old_lines += 1
elif diff == -1:
new_line_update = (word, count) in new_file_lines
old_line_update = old_input_word_counts.get(word) == count
if not (new_line_update or old_line_update):
n_old_lines += 1
else:
raise ValueError("Incorrect diff value: {diff}")
except FileNotFoundError:
if interrupted_run:
return False
Expand Down Expand Up @@ -187,12 +210,13 @@ def start_pw_computation(
pstorage_path,
mode,
pstorage_type,
persistence_mode,
first_port,
):
pw_wordcount_path = (
"/".join(os.path.abspath(__file__).split("/")[:-1])
+ f"/pw_wordcount.py --input {input_path} --output {output_path} --pstorage {pstorage_path} "
+ f"--mode {mode} --pstorage-type {pstorage_type}"
+ f"--mode {mode} --pstorage-type {pstorage_type} --persistence_mode {persistence_mode}"
)
n_cpus = n_threads * n_processes
cpu_list = ",".join([str(x) for x in range(n_cpus)])
Expand Down Expand Up @@ -223,6 +247,7 @@ def get_pw_program_run_time(
pstorage_path,
mode,
pstorage_type,
persistence_mode,
first_port,
):
needs_pw_program_launch = True
Expand All @@ -238,6 +263,7 @@ def get_pw_program_run_time(
pstorage_path=pstorage_path,
mode=mode,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)
try:
Expand Down Expand Up @@ -300,6 +326,7 @@ def run_pw_program_suddenly_terminate(
min_work_time,
max_work_time,
pstorage_type,
persistence_mode,
first_port,
):
process_handles = start_pw_computation(
Expand All @@ -310,6 +337,7 @@ def run_pw_program_suddenly_terminate(
pstorage_path=pstorage_path,
mode=STATIC_MODE_NAME,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)
try:
Expand Down Expand Up @@ -394,6 +422,7 @@ def do_test_persistent_wordcount(
tmp_path,
mode,
pstorage_type,
persistence_mode,
first_port,
):
inputs_path = tmp_path / "inputs"
Expand All @@ -414,6 +443,7 @@ def do_test_persistent_wordcount(
pstorage_path=pstorage_path,
mode=mode,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)
print(f"Run {n_run}: pathway time elapsed {elapsed}")
Expand All @@ -432,6 +462,7 @@ def do_test_failure_recovery_static(
min_work_time,
max_work_time,
pstorage_type,
persistence_mode,
first_port,
):
inputs_path = tmp_path / "inputs"
Expand All @@ -454,6 +485,7 @@ def do_test_failure_recovery_static(
min_work_time=min_work_time,
max_work_time=max_work_time,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)

Expand All @@ -474,6 +506,7 @@ def do_test_failure_recovery_static(
pstorage_path=pstorage_path,
mode=STATIC_MODE_NAME,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)
print("Time elapsed for non-interrupted run:", elapsed)
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/wordcount/pw_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@
parser.add_argument("--pstorage", type=str)
parser.add_argument("--mode", type=str)
parser.add_argument("--pstorage-type", type=str)
parser.add_argument("--persistence_mode", type=str)
args = parser.parse_args()

if args.persistence_mode == "PERSISTING":
persistence_mode = pw.PersistenceMode.PERSISTING
elif args.persistence_mode == "OPERATOR_PERSISTING":
persistence_mode = pw.PersistenceMode.OPERATOR_PERSISTING
else:
ValueError(f"Unsupported persistence mode: {args.persistence_mode}")

if args.pstorage_type == "fs":
pstorage_config = pw.persistence.Config(
pw.persistence.Backend.filesystem(path=args.pstorage),
snapshot_interval_ms=5000,
persistence_mode=persistence_mode,
)
elif args.pstorage_type == "s3":
aws_s3_settings = pw.io.s3.AwsS3Settings(
Expand All @@ -33,6 +42,7 @@
bucket_settings=aws_s3_settings,
),
snapshot_interval_ms=5000,
persistence_mode=persistence_mode,
)
else:
raise ValueError(f"Unknown persistent storage type: {args.pstorage_type}")
Expand Down
14 changes: 14 additions & 0 deletions integration_tests/wordcount/test_new_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from .base import (
FS_STORAGE_NAME,
INPUT_PERSISTENCE_MODE_NAME,
OPERATOR_PERSISTENCE_MODE_NAME,
S3_STORAGE_NAME,
STATIC_MODE_NAME,
STREAMING_MODE_NAME,
Expand All @@ -20,6 +22,9 @@
"n_threads,n_processes", [(1, 1), (2, 1), (4, 1), (1, 4), (1, 2), (2, 2)]
)
@pytest.mark.parametrize("pstorage_type", [S3_STORAGE_NAME, FS_STORAGE_NAME])
@pytest.mark.parametrize(
"persistence_mode", [INPUT_PERSISTENCE_MODE_NAME, OPERATOR_PERSISTENCE_MODE_NAME]
)
@pytest.mark.parametrize(
"n_backfilling_runs,mode",
[
Expand All @@ -33,6 +38,7 @@ def test_integration_new_data(
n_processes,
mode,
pstorage_type,
persistence_mode,
tmp_path: pathlib.Path,
port: int,
):
Expand All @@ -43,6 +49,7 @@ def test_integration_new_data(
tmp_path=tmp_path,
mode=mode,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=port,
)

Expand All @@ -64,6 +71,12 @@ def test_integration_new_data(
choices=["s3", "fs"],
default="fs",
)
parser.add_argument(
"--persistence_mode",
type=str,
choices=[INPUT_PERSISTENCE_MODE_NAME, OPERATOR_PERSISTENCE_MODE_NAME],
default=INPUT_PERSISTENCE_MODE_NAME,
)
args = parser.parse_args()

do_test_persistent_wordcount(
Expand All @@ -73,5 +86,6 @@ def test_integration_new_data(
tmp_path=pathlib.Path("./"),
mode=args.mode,
pstorage_type=args.pstorage_type,
persistence_mode=args.persistence_mode,
first_port=56700,
)
Loading

0 comments on commit 922d5a6

Please sign in to comment.