diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index 600827ffa6..5a9ddd71a6 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -26,6 +26,7 @@ dependencies: - automake=1.16.5 - benchmark=1.6.1 - boost-cpp=1.74 + - boto3 - cachetools=5.0.0 - ccache>=3.7 - clangdev=14 @@ -91,6 +92,7 @@ dependencies: - pytorch=2.0.1 - rapidjson=1.1.0 - requests=2.31 + - s3fs>=2023.6 - scikit-build=0.17.1 - scikit-learn=1.2.2 - sphinx diff --git a/morpheus/cli/commands.py b/morpheus/cli/commands.py index fae791cf0f..8fc7096f1a 100644 --- a/morpheus/cli/commands.py +++ b/morpheus/cli/commands.py @@ -650,6 +650,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): add_command("delay", "morpheus.stages.general.delay_stage.DelayStage", modes=ALL) add_command("deserialize", "morpheus.stages.preprocess.deserialize_stage.DeserializeStage", modes=NOT_AE) add_command("dropna", "morpheus.stages.preprocess.drop_null_stage.DropNullStage", modes=NOT_AE) +add_command("file-source", "morpheus.stages.input.file_source.FileSource", modes=NOT_AE) add_command("filter", "morpheus.stages.postprocess.filter_detections_stage.FilterDetectionsStage", modes=ALL) add_command("from-azure", "morpheus.stages.input.azure_source_stage.AzureSourceStage", modes=AE_ONLY) add_command("from-appshield", "morpheus.stages.input.appshield_source_stage.AppShieldSourceStage", modes=FIL_ONLY) diff --git a/morpheus/stages/input/file_source.py b/morpheus/stages/input/file_source.py new file mode 100644 index 0000000000..2e9873f3bb --- /dev/null +++ b/morpheus/stages/input/file_source.py @@ -0,0 +1,267 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""File source stage.""" + +import logging +import time +import typing +from functools import partial +from urllib.parse import urlsplit + +import fsspec +import mrc +import s3fs +from mrc.core import operators as ops + +from morpheus.cli import register_stage +from morpheus.common import FileTypes +from morpheus.config import Config +from morpheus.io.deserializers import read_file_to_df +from morpheus.messages import MessageMeta +from morpheus.pipeline.preallocator_mixin import PreallocatorMixin +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.stream_pair import StreamPair + +logger = logging.getLogger(__name__) + + +@register_stage("file-source") +class FileSource(PreallocatorMixin, SingleOutputSource): + """ + Load messages from a file. + + FileSource is used to produce messages loaded from a file. Useful for testing performance and + accuracy of a pipeline. + + Parameters + ---------- + config : morpheus.config.Config + Pipeline configuration instance. + files : List[str] + List of paths to be read from, can be a list of S3 URLs (`s3://path`) and can include wildcard characters `*` + as defined by `fsspec`: + https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files + watch : bool, default = False + When True, will check `files` for new files and emit them as they appear. + watch_interval : float, default = 1.0 + When `watch` is True, this is the time in seconds between polling the paths in `files` for new files. + sort : bool, default = False + When True, the list of files will be processed in sorted order. + file_type : morpheus.common.FileTypes, optional, case_sensitive = False + Indicates what type of file to read. Specifying 'auto' will determine the file type from the extension. + Supported extensions: 'csv', 'json', 'jsonlines' and 'parquet'. + parser_kwargs : dict, default = None + Extra options to pass to the file parser. + max_files : int, default = -1 + Max number of files to read. Useful for debugging to limit startup time. Default value of -1 is unlimited. + storage_connection_kwargs : dict, default = None + Extra settings that are relevant to a specific storage connection used by `fsspec.open_files`. + """ + + def __init__(self, + config: Config, + files: typing.List[str], + watch: bool = False, + watch_interval: float = 1.0, + sort: bool = False, + file_type: FileTypes = FileTypes.Auto, + parser_kwargs: dict = None, + max_files: int = -1, + storage_connection_kwargs: dict = None): + + super().__init__(config) + + if not files or len(files) == 0: + raise ValueError("The 'files' cannot be empty.") + + if watch and len(files) != 1: + raise ValueError("When 'watch' is True, the 'files' should contain exactly one file path.") + + self._files = list(files) + self._protocols = self._extract_unique_protocols() + + if len(self._protocols) > 1: + raise ValueError("Accepts same protocol input files, but it received multiple protocols.") + + self._watch = watch + self._sort = sort + self._file_type = file_type + self._parser_kwargs = parser_kwargs or {} + self._storage_connection_kwargs = storage_connection_kwargs or {} + self._watch_interval = watch_interval + self._max_files = max_files + + @property + def name(self) -> str: + """Return the name of the stage.""" + return "file-source" + + def supports_cpp_node(self) -> bool: + """Indicates whether or not this stage supports a C++ node.""" + return False + + def _extract_unique_protocols(self) -> set: + """Extracts unique protocols from the given file paths.""" + protocols = set() + + for file in self._files: + scheme = urlsplit(file).scheme + if scheme: + protocols.add(scheme.lower()) + else: + protocols.add("file") + + return protocols + + def _build_source(self, builder: mrc.Builder) -> StreamPair: + + if self._build_cpp_node(): + raise RuntimeError("Does not support C++ nodes.") + + if self._watch: + generator_function = self._polling_generate_frames_fsspec + else: + generator_function = self._generate_frames_fsspec + + out_stream = builder.make_source(self.unique_name, generator_function()) + out_type = fsspec.core.OpenFiles + + # Supposed to just return a source here + return out_stream, out_type + + def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: + + files: fsspec.core.OpenFiles = fsspec.open_files(self._files, **self._storage_connection_kwargs) + + if (len(files) == 0): + raise RuntimeError(f"No files matched input strings: '{self._files}'. " + "Check your input pattern and ensure any credentials are correct.") + + if self._sort: + files = sorted(files, key=lambda f: f.full_name) + + if self._max_files > 0: + files = files[:self._max_files] + + yield files + + def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]: + files_seen = set() + curr_time = time.monotonic() + next_update_epoch = curr_time + processed_files_count = 0 + has_s3_protocol = "s3" in self._protocols + + while (True): + # Before doing any work, find the next update epoch after the current time + while (next_update_epoch <= curr_time): + # Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals + next_update_epoch += self._watch_interval + + file_set = set() + filtered_files = [] + + # Clear cached instance, otherwise we don't receive newly touched files. + if has_s3_protocol: + s3fs.S3FileSystem.clear_instance_cache() + + files = fsspec.open_files(self._files, **self._storage_connection_kwargs) + + for file in files: + file_set.add(file.full_name) + if file.full_name not in files_seen: + filtered_files.append(file) + + # Replace files_seen with the new set of files. This prevents a memory leak that could occurr if files are + # deleted from the input directory. In addition if a file with a given name was created, seen/processed by + # the stage, and then deleted, and a new file with the same name appeared sometime later, the stage will + # need to re-ingest that new file. + files_seen = file_set + + if len(filtered_files) > 0: + + if self._sort: + filtered_files = sorted(filtered_files, key=lambda f: f.full_name) + + if self._max_files > 0: + filtered_files = filtered_files[:self._max_files - processed_files_count] + processed_files_count += len(filtered_files) + + if self._max_files <= processed_files_count: + logger.debug("Maximum file limit reached. Exiting polling service...") + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) + break + + yield fsspec.core.OpenFiles(filtered_files, fs=files.fs) + + curr_time = time.monotonic() + + # If we spent more than `self._watch_interval` doing work and/or yielding to the output channel blocked, + # then we should only sleep for the remaining time until the next update epoch. + sleep_duration = next_update_epoch - curr_time + if (sleep_duration > 0): + time.sleep(sleep_duration) + curr_time = time.monotonic() + + @staticmethod + def generate_frames(file: fsspec.core.OpenFile, file_type: FileTypes, parser_kwargs: dict) -> MessageMeta: + """ + Generate message frame from a file. + + This function reads data from a file and generates message frames (MessageMeta) based on the file's content. + It can be used to load and process messages from a file for testing and analysis within a Morpheus pipeline. + + Parameters + ---------- + file : fsspec.core.OpenFile + An open file object using fsspec. + file_type : FileTypes + Indicates the type of the file to read. Supported types include 'csv', 'json', 'jsonlines', and 'parquet'. + parser_kwargs : dict + Additional keyword arguments to pass to the file parser. + + Returns + ------- + MessageMeta + MessageMeta object, each containing a dataframe of messages from the file. + """ + df = read_file_to_df( + file.full_name, + file_type=file_type, + filter_nulls=False, + parser_kwargs=parser_kwargs, + df_type="cudf", + ) + + meta = MessageMeta(df) + + return meta + + def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair: + + out_stream = out_pair[0] + + post_node = builder.make_node( + self.unique_name + "-post", + ops.flatten(), # Flatten list of open fsspec files + ops.map(partial(self.generate_frames, file_type=self._file_type, + parser_kwargs=self._parser_kwargs)) # Generate dataframe for each file + ) + + builder.make_edge(out_stream, post_node) + + out_stream = post_node + out_type = MessageMeta + + return super()._post_build_single(builder, (out_stream, out_type)) diff --git a/tests/test_file_source.py b/tests/test_file_source.py new file mode 100644 index 0000000000..07d5557f0a --- /dev/null +++ b/tests/test_file_source.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest import mock + +import fsspec +import pytest + +import cudf + +from _utils import TEST_DIRS +from morpheus.common import FileTypes +from morpheus.messages.message_meta import MessageMeta +from morpheus.pipeline.pipeline import Pipeline +from morpheus.stages.input.file_source import FileSource +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage + + +@pytest.mark.use_python +@pytest.mark.parametrize("input_files,watch, protocols", + [(["file1.json", "file2.json"], False, ["file"]), + (["file://file1.json", "file2.json"], False, ["file"]), + (["file:///file1.json"], False, ["file"]), (["test_data/*.json"], True, ["file"]), + (["s3://test_data/file1.json", "s3://test_data/file2.json"], False, ["s3"]), + (["s3://test_data/*.json"], True, ["s3"])]) +def test_constructor(config, input_files, watch, protocols): + source = FileSource(config, files=input_files, watch=watch) + assert sorted(source._protocols) == protocols + + +@pytest.mark.use_python +@pytest.mark.parametrize( + "input_files,watch,error_msg", + [(["file1.json", "file2.json"], True, "When 'watch' is True, the 'files' should contain exactly one file path."), + ([], True, "The 'files' cannot be empty."), ([], False, "The 'files' cannot be empty."), + (None, True, "The 'files' cannot be empty."), (None, False, "The 'files' cannot be empty."), + (["file1.json", "s3://test_data/file2.json"], + True, + "When 'watch' is True, the 'files' should contain exactly one file path."), + (["file1.json", "s3://test_data/file2.json"], + False, + "Accepts same protocol input files, but it received multiple protocols.")]) +def test_constructor_error(config, input_files, watch, error_msg): + with pytest.raises(ValueError, match=error_msg): + FileSource(config, files=input_files, watch=watch) + + +@pytest.mark.use_python +@pytest.mark.parametrize("input_file,filetypes,parser_kwargs,expected_df_count", + [("filter_probs.json", FileTypes.Auto, { + "lines": False + }, 20), ("filter_probs.jsonlines", FileTypes.JSON, { + "lines": True + }, 20)]) +def test_generate_frames(input_file, filetypes, parser_kwargs, expected_df_count): + in_file = fsspec.open(os.path.join(TEST_DIRS.tests_data_dir, input_file)) + + meta = FileSource.generate_frames(file=in_file, file_type=filetypes, parser_kwargs=parser_kwargs) + + assert len(meta.df.columns) == 4 + assert len(meta.df) == expected_df_count + assert isinstance(meta, MessageMeta) + assert isinstance(meta.df, cudf.DataFrame) + + +@pytest.mark.use_python +@pytest.mark.parametrize( + "input_files,parser_kwargs,max_files,watch,storage_connection_kwargs,expected_result", + [([ + "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", + "s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" + ], { + "lines": False, "orient": "records" + }, + -1, + False, + None, + 2), + ([ + "/rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json", + "/rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T12_09_47.901Z.json" + ], { + "lines": False, "orient": "records" + }, + -1, + False, { + "protocol": "s3" + }, + 2), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, -1, False, None, 3), + ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, None, RuntimeError), + ([f'file:/{os.path.join(TEST_DIRS.tests_data_dir, "triton_abp_inf_results.csv")}'], + None, + -1, + False, + None, + FileNotFoundError), + ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, -1, False, None, 3), + (["s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022-08-01T00_05_06.806Z.json"], { + "lines": False, "orient": "records" + }, + 1, + True, + None, + 1), ([os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")], None, 2, False, None, 2), + ([f'file://{os.path.join(TEST_DIRS.tests_data_dir, "triton_*.csv")}'], None, 3, True, None, 3)]) +def test_filesource_pipe(config, + input_files, + parser_kwargs, + max_files, + watch, + storage_connection_kwargs, + expected_result): + + pipe = Pipeline(config) + + file_source_stage = FileSource(config, + files=input_files, + watch=watch, + max_files=max_files, + parser_kwargs=parser_kwargs, + storage_connection_kwargs=storage_connection_kwargs) + sink_stage = InMemorySinkStage(config) + + pipe.add_stage(file_source_stage) + pipe.add_stage(sink_stage) + + pipe.add_edge(file_source_stage, sink_stage) + + if expected_result in (RuntimeError, FileNotFoundError): + with pytest.raises(expected_result): + pipe.run() + else: + pipe.run() + + assert len(sink_stage.get_messages()) == expected_result + + +@pytest.mark.use_python +@pytest.mark.parametrize("watch", [True, False]) +@mock.patch.object(FileSource, '_polling_generate_frames_fsspec') +@mock.patch.object(FileSource, '_generate_frames_fsspec') +def test_build_source(mock_generate_frames_fsspec, mock_polling_generate_frames_fsspec, watch, config): + files = ["s3://rapidsai-data/cyber/morpheus/dfp/duo/DUO_2022*.json"] + source = FileSource(config=config, files=files, watch=watch) + + mock_node = mock.MagicMock() + mock_builder = mock.MagicMock() + mock_builder.make_source.return_value = mock_node + out_stream, out_type = source._build_source(mock_builder) + + if watch: + mock_polling_generate_frames_fsspec.assert_called_once() + with pytest.raises(Exception): + mock_generate_frames_fsspec.assert_called_once() + else: + mock_generate_frames_fsspec.assert_called_once() + with pytest.raises(Exception): + mock_polling_generate_frames_fsspec.assert_called_once() + + assert out_stream == mock_node + assert out_type == fsspec.core.OpenFiles