diff --git a/docs/source/developer_guide/guides/2_real_world_phishing.md b/docs/source/developer_guide/guides/2_real_world_phishing.md index c460af3e02..61ba218fd5 100644 --- a/docs/source/developer_guide/guides/2_real_world_phishing.md +++ b/docs/source/developer_guide/guides/2_real_world_phishing.md @@ -482,7 +482,6 @@ To explicitly set the output format we could specify the `file_type` argument to ```python import logging import os -import tempfile import click @@ -542,7 +541,7 @@ MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] @click.option("--server_url", default='localhost:8000', help="Tritonserver url.") @click.option( "--output_file", - default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), + default=".tmp/output/phishing_detections.jsonlines", help="The path to the file where the inference output will be saved.", ) def run_pipeline(use_stage_function: bool, @@ -633,7 +632,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \ add-scores --label=is_phishing \ serialize \ - to-file --filename=/tmp/detections.jsonlines --overwrite + to-file --filename=.tmp/output/phishing_detections_cli.jsonlines --overwrite ``` ## Stage Constructors diff --git a/examples/abp_nvsmi_detection/README.md b/examples/abp_nvsmi_detection/README.md index 244d729420..f7fed3e260 100644 --- a/examples/abp_nvsmi_detection/README.md +++ b/examples/abp_nvsmi_detection/README.md @@ -140,7 +140,7 @@ morpheus --log_level=DEBUG \ `# 7th Stage: Convert from objects back into strings. Ignore verbose input data` \ serialize --include 'mining' \ `# 8th Stage: Write out the JSON lines to the detections.jsonlines file` \ - to-file --filename=detections.jsonlines --overwrite + to-file --filename=.tmp/output/abp_nvsmi_detections.jsonlines --overwrite ``` If successful, the following should be displayed: @@ -217,7 +217,7 @@ Added stage: morpheus.ControlMessage Added stage: └─ morpheus.ControlMessage -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Pipeline Complete!==== Starting! Time: 1656353254.9919598 @@ -225,7 +225,7 @@ Inference Rate[Complete]: 1242inf [00:00, 1863.04inf/s] ====Pipeline Complete==== ``` -The output file `detections.jsonlines` will contain a single boolean value for each input line. At some point the values will switch from `0` to `1`: +The output file `.tmp/output/detections.jsonlines` will contain a single boolean value for each input line. At some point the values will switch from `0` to `1`: ``` ... diff --git a/examples/abp_pcap_detection/README.md b/examples/abp_pcap_detection/README.md index 6dc63212af..9c04f5e68f 100644 --- a/examples/abp_pcap_detection/README.md +++ b/examples/abp_pcap_detection/README.md @@ -97,7 +97,7 @@ python examples/abp_pcap_detection/run.py ``` Note: Both Morpheus and Triton Inference Server containers must have access to the same GPUs in order for this example to work. -The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `pcap_out.jsonlines`. +The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `.tmp/output/pcap_out.jsonlines`. ### CLI Example The above example is illustrative of using the Python API to build a custom Morpheus Pipeline. @@ -118,6 +118,6 @@ morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preproc monitor --description "Add classification rate" --unit "add-class" \ serialize \ monitor --description "Serialize rate" --unit ser \ - to-file --filename "pcap_out.jsonlines" --overwrite \ + to-file --filename ".tmp/output/pcap_out_cli.jsonlines" --overwrite \ monitor --description "Write to file rate" --unit "to-file" ``` diff --git a/examples/abp_pcap_detection/run.py b/examples/abp_pcap_detection/run.py index b1a654bbd9..24405bad3c 100644 --- a/examples/abp_pcap_detection/run.py +++ b/examples/abp_pcap_detection/run.py @@ -65,7 +65,7 @@ ) @click.option( "--output_file", - default="./pcap_out.jsonlines", + default="./.tmp/output/pcap_out.jsonlines", help="The path to the file where the inference output will be saved.", ) @click.option( diff --git a/examples/cpu_only/README.md b/examples/cpu_only/README.md index feac382a3f..3e8abd3233 100644 --- a/examples/cpu_only/README.md +++ b/examples/cpu_only/README.md @@ -53,7 +53,7 @@ Options: To launch the configured Morpheus pipeline with the sample data that is provided in `examples/data`, run the following: ```bash -python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/out.jsonlines +python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/output/cpu_only_out.jsonlines ``` ### CLI Example @@ -68,5 +68,5 @@ morpheus --log_level INFO \ deserialize \ monitor --description "deserialize" \ serialize \ - to-file --filename=.tmp/out.jsonlines --overwrite + to-file --filename=.tmp/output/cpu_only_cli_out.jsonlines --overwrite ``` diff --git a/examples/cpu_only/run.py b/examples/cpu_only/run.py index f0a50a47e0..7cbc96a440 100644 --- a/examples/cpu_only/run.py +++ b/examples/cpu_only/run.py @@ -61,7 +61,7 @@ "--out_file", help="Output file", type=click.Path(dir_okay=False), - default="output.csv", + default=".tmp/output/cpu_only_out.csv", required=True, ) def run_pipeline(log_level: int, use_cpu_only: bool, in_file: pathlib.Path, out_file: pathlib.Path): diff --git a/examples/developer_guide/2_1_real_world_phishing/run.py b/examples/developer_guide/2_1_real_world_phishing/run.py index 64ae7d77dc..8f83fade5c 100755 --- a/examples/developer_guide/2_1_real_world_phishing/run.py +++ b/examples/developer_guide/2_1_real_world_phishing/run.py @@ -17,7 +17,6 @@ import logging import os -import tempfile import click from recipient_features_stage import RecipientFeaturesStage @@ -77,7 +76,7 @@ @click.option("--server_url", default='localhost:8000', help="Tritonserver url.") @click.option( "--output_file", - default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), + default=".tmp/output/phishing_detections.jsonlines", help="The path to the file where the inference output will be saved.", ) def run_pipeline(use_stage_function: bool, diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index c7206787a6..9110cc27ff 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -97,7 +97,7 @@ Added stage: morpheus.MessageMeta Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Segment Complete!==== Graph construction rate[Complete]: 265 messages [00:00, 1016.18 messages/s] @@ -128,5 +128,5 @@ morpheus --log_level INFO \ gnn-fraud-classification --model_xgb_file examples/gnn_fraud_detection_pipeline/model/xgb.pt \ monitor --description "Add classification rate" \ serialize \ - to-file --filename "output.csv" --overwrite + to-file --filename "gnn_fraud_detection_cli_output.csv" --overwrite ``` diff --git a/examples/gnn_fraud_detection_pipeline/run.py b/examples/gnn_fraud_detection_pipeline/run.py index a5de019ed7..73c3301d47 100644 --- a/examples/gnn_fraud_detection_pipeline/run.py +++ b/examples/gnn_fraud_detection_pipeline/run.py @@ -84,7 +84,7 @@ @click.option( "--output_file", type=click.Path(dir_okay=False), - default="output.csv", + default=".tmp/output/gnn_fraud_detection_output.csv", help="The path to the file where the inference output will be saved.", ) def run_pipeline(num_threads, diff --git a/examples/log_parsing/README.md b/examples/log_parsing/README.md index 5d2485a3bc..19cda49a66 100644 --- a/examples/log_parsing/README.md +++ b/examples/log_parsing/README.md @@ -119,6 +119,6 @@ morpheus --log_level INFO \ monitor --description "Inference rate" --unit inf \ log-postprocess --vocab_path ./models/training-tuning-scripts/sid-models/resources/bert-base-cased-vocab.txt \ --model_config_path=./models/log-parsing-models/log-parsing-config-20220418.json \ - to-file --filename ./log-parsing-output.jsonlines --overwrite \ + to-file --filename .tmp/output/log-parsing-cli-output.jsonlines --overwrite \ monitor --description "Postprocessing rate" ``` diff --git a/examples/log_parsing/run.py b/examples/log_parsing/run.py index a85379f166..20e836c4c8 100644 --- a/examples/log_parsing/run.py +++ b/examples/log_parsing/run.py @@ -60,7 +60,7 @@ ) @click.option( "--output_file", - default="log-parsing-output.jsonlines", + default=".tmp/output/log-parsing-output.jsonlines", help="The path to the file where the inference output will be saved.", ) @click.option('--model_vocab_hash_file', diff --git a/examples/nlp_si_detection/README.md b/examples/nlp_si_detection/README.md index 37d4abfa1f..2ed3c8dcb0 100644 --- a/examples/nlp_si_detection/README.md +++ b/examples/nlp_si_detection/README.md @@ -131,8 +131,8 @@ morpheus --log_level=DEBUG \ filter --filter_source=TENSOR \ `# 8th Stage: Convert from objects back into strings` \ serialize --exclude '^_ts_' \ - `# 9th Stage: Write out the JSON lines to the detections.jsonlines file` \ - to-file --filename=detections.jsonlines --overwrite + `# 9th Stage: Write out the JSON lines to the nlp_si_detections.jsonlines file` \ + to-file --filename=.tmp/output/nlp_si_detections.jsonlines --overwrite ``` If successful, the following should be displayed: @@ -187,7 +187,7 @@ Added stage: morpheus.ControlMessage Added stage: └─ morpheus.ControlMessage -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Pipeline Complete!==== Starting! Time: 1656352480.541071 @@ -196,7 +196,7 @@ Inference Rate[Complete]: 93085inf [00:07, 12673.63inf/s] ``` -The output file `detections.jsonlines` will contain the original PCAP messages with the following additional fields added: +The output file `.tmp/output/nlp_si_detections.jsonlines` will contain the original PCAP messages with the following additional fields added: * `address` * `bank_acct` * `credit_card` diff --git a/examples/nlp_si_detection/run.sh b/examples/nlp_si_detection/run.sh index 390418e545..d88f6a8ffb 100755 --- a/examples/nlp_si_detection/run.sh +++ b/examples/nlp_si_detection/run.sh @@ -29,4 +29,4 @@ morpheus --log_level=DEBUG \ add-class \ filter --filter_source=TENSOR \ serialize --exclude '^_ts_' \ - to-file --filename=detections.jsonlines --overwrite + to-file --filename=.tmp/output/nlp_si_detections.jsonlines --overwrite diff --git a/examples/ransomware_detection/README.md b/examples/ransomware_detection/README.md index e1f7197e1e..226dba098d 100644 --- a/examples/ransomware_detection/README.md +++ b/examples/ransomware_detection/README.md @@ -72,7 +72,7 @@ python examples/ransomware_detection/run.py --server_url=localhost:8000 \ --sliding_window=3 \ --model_name=ransomw-model-short-rf \ --input_glob=./examples/data/appshield/*/snapshot-*/*.json \ - --output_file=./ransomware_detection_output.jsonlines + --output_file=.tmp/output/ransomware_detection_output.jsonlines ``` Input features for a short model can be taken from every three snapshots sequence, such as (1, 2, 3), or (2, 3, 4). The sliding window represents the number of subsequent snapshots that need to be taken into consideration when generating the input for a model. Sliding window for the medium model is `5` and for the long model it is `10`. diff --git a/examples/ransomware_detection/run.py b/examples/ransomware_detection/run.py index 4887e7ff1b..7bc8dbf487 100644 --- a/examples/ransomware_detection/run.py +++ b/examples/ransomware_detection/run.py @@ -21,9 +21,12 @@ from stages.create_features import CreateFeaturesRWStage from stages.preprocessing import PreprocessingRWStage +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import MessageMeta from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import stage from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.input.appshield_source_stage import AppShieldSourceStage @@ -61,6 +64,12 @@ type=click.IntRange(min=1), help="Max batch size to use for the model.", ) +@click.option( + "--pipeline_batch_size", + default=1024, + type=click.IntRange(min=1), + help=("Internal batch size for the pipeline. Can be much larger than the model batch size."), +) @click.option( "--conf_file", type=click.STRING, @@ -95,21 +104,22 @@ @click.option( "--output_file", type=click.STRING, - default="./ransomware_detection_output.jsonlines", + default=".tmp/output/ransomware_detection_output.jsonlines", help="The path to the file where the inference output will be saved.", ) -def run_pipeline(debug, - num_threads, - n_dask_workers, - threads_per_dask_worker, - model_max_batch_size, - conf_file, - model_name, - server_url, - sliding_window, - input_glob, - watch_directory, - output_file): +def run_pipeline(debug: bool, + num_threads: int, + n_dask_workers: int, + threads_per_dask_worker: int, + model_max_batch_size: int, + pipeline_batch_size: int, + conf_file: str, + model_name: str, + server_url: str, + sliding_window: int, + input_glob: str, + watch_directory: bool, + output_file: str): if debug: configure_logging(log_level=logging.DEBUG) @@ -125,6 +135,7 @@ def run_pipeline(debug, # Below properties are specified by the command line. config.num_threads = num_threads config.model_max_batch_size = model_max_batch_size + config.pipeline_batch_size = pipeline_batch_size config.feature_length = snapshot_fea_length * sliding_window config.class_labels = ["pred", "score"] @@ -222,6 +233,18 @@ def run_pipeline(debug, # This stage logs the metrics (msg/sec) from the above stage. pipeline.add_stage(MonitorStage(config, description="Serialize rate")) + @stage(needed_columns={'timestamp_process': TypeId.STRING}) + def concat_columns(msg: MessageMeta) -> MessageMeta: + """ + This stage concatinates the timestamp and pid_process columns to create a unique field. + """ + with msg.mutable_dataframe() as df: + df['timestamp_process'] = df['timestamp'] + df['pid_process'] + + return msg + + pipeline.add_stage(concat_columns(config)) + # Add a write file stage. # This stage writes all messages to a file. pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) diff --git a/examples/root_cause_analysis/README.md b/examples/root_cause_analysis/README.md index 45d36b8f0f..0a5c178de8 100644 --- a/examples/root_cause_analysis/README.md +++ b/examples/root_cause_analysis/README.md @@ -124,7 +124,7 @@ add-scores --label=is_root_cause \ `# 7th Stage: Convert from objects back into strings` \ serialize --exclude '^ts_' \ `# 8th Stage: Write results out to CSV file` \ -to-file --filename=./root-cause-binary-output.jsonlines --overwrite +to-file --filename=.tmp/output/root-cause-binary-output.jsonlines --overwrite ``` If successful, the following should be displayed: @@ -177,10 +177,10 @@ Added stage: └─ morpheus.ControlMessagee -> morpheus.ControlMessage Added stage: └─ morpheus.ControlMessage -> morpheus.MessageMeta -Added stage: +Added stage: └─ morpheus.MessageMeta -> morpheus.MessageMeta Inference rate[Complete]: 473 inf [00:01, 340.43 inf/s] ====Pipeline Complete==== ``` -The output file `root-cause-binary-output.jsonlines` will contain the original kernel log messages with an additional field `is_root_cause`. The value of the new field will be the root cause probability. +The output file `.tmp/output/root-cause-binary-output.jsonlines` will contain the original kernel log messages with an additional field `is_root_cause`. The value of the new field will be the root cause probability. diff --git a/python/morpheus/morpheus/config.py b/python/morpheus/morpheus/config.py index 2bc589a186..16ba6ed86d 100644 --- a/python/morpheus/morpheus/config.py +++ b/python/morpheus/morpheus/config.py @@ -235,6 +235,7 @@ def freeze(self): """ self._check_cpp_mode(fix_mis_match=not self.frozen) if not self.frozen: + self._validate_config() self.frozen = True def _check_cpp_mode(self, fix_mis_match: bool = False): @@ -267,7 +268,6 @@ def pipeline_batch_size(self): @pipeline_batch_size.setter def pipeline_batch_size(self, value: int): self._pipeline_batch_size = value - self._validate_config() @property def model_max_batch_size(self): @@ -276,7 +276,6 @@ def model_max_batch_size(self): @model_max_batch_size.setter def model_max_batch_size(self, value: int): self._model_max_batch_size = value - self._validate_config() def _validate_config(self): if self._pipeline_batch_size < self._model_max_batch_size: diff --git a/tests/morpheus/test_config.py b/tests/morpheus/test_config.py index 746acf3771..9ebce02c8b 100755 --- a/tests/morpheus/test_config.py +++ b/tests/morpheus/test_config.py @@ -159,6 +159,7 @@ def test_warning_model_batch_size_less_than_pipeline_batch_size(caplog: pytest.L config.pipeline_batch_size = 256 with caplog.at_level(logging.WARNING): config.model_max_batch_size = 257 + config.freeze() assert len(caplog.records) == 1 import re assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None @@ -169,6 +170,7 @@ def test_warning_pipeline_batch_size_less_than_model_batch_size(caplog: pytest.L config.model_max_batch_size = 8 with caplog.at_level(logging.WARNING): config.pipeline_batch_size = 7 + config.freeze() assert len(caplog.records) == 1 import re assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None