Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc cleanups to example pipelines #2049

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,18 @@ examples/digital_fingerprinting/visualization/*-dfp-output/
# Ignore saved benchmark data
tests/benchmarks/.benchmarks/
examples/digital_fingerprinting/production/benchmarks/.benchmarks/

# Ignore output files from examples
/abp_nvsmi_detections.jsonlines
/detections.jsonlines
/gnn_fraud_detection_cli_output.csv
/gnn_fraud_detection_output.csv
/log-parsing-output.jsonlines
/log-parsing-cli-output.jsonlines
/nlp_si_detections.jsonlines
/pcap_out.jsonlines
/pcap_out_cli.jsonlines
/phishing_detections.jsonlines
/phishing_detections_cli.jsonlines
/ransomware_detection_output.jsonlines
/root-cause-binary-output.jsonlines
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 2 additions & 3 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ To explicitly set the output format we could specify the `file_type` argument to
```python
import logging
import os
import tempfile

import click

Expand Down Expand Up @@ -542,7 +541,7 @@ MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT']
@click.option("--server_url", default='localhost:8000', help="Tritonserver url.")
@click.option(
"--output_file",
default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"),
default="phishing_detections.jsonlines",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(use_stage_function: bool,
Expand Down Expand Up @@ -633,7 +632,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis
monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \
add-scores --label=is_phishing \
serialize \
to-file --filename=/tmp/detections.jsonlines --overwrite
to-file --filename=phishing_detections_cli.jsonlines --overwrite
```

## Stage Constructors
Expand Down
4 changes: 2 additions & 2 deletions examples/abp_nvsmi_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ morpheus --log_level=DEBUG \
`# 7th Stage: Convert from objects back into strings. Ignore verbose input data` \
serialize --include 'mining' \
`# 8th Stage: Write out the JSON lines to the detections.jsonlines file` \
to-file --filename=detections.jsonlines --overwrite
to-file --filename=abp_nvsmi_detections.jsonlines --overwrite
```

If successful, the following should be displayed:
Expand Down Expand Up @@ -217,7 +217,7 @@ Added stage: <add-class-5; AddClassificationsStage(threshold=0.5, labels=[], pre
└─ morpheus.ControlMessage -> morpheus.ControlMessage
Added stage: <serialize-6; SerializeStage(include=['mining'], exclude=['^ID$', '^_ts_'], fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-7; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
Added stage: <to-file-7; WriteToFileStage(filename=abp_nvsmi_detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1656353254.9919598
Expand Down
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preproc
monitor --description "Add classification rate" --unit "add-class" \
serialize \
monitor --description "Serialize rate" --unit ser \
to-file --filename "pcap_out.jsonlines" --overwrite \
to-file --filename "pcap_out_cli.jsonlines" --overwrite \
monitor --description "Write to file rate" --unit "to-file"
```
3 changes: 1 addition & 2 deletions examples/developer_guide/2_1_real_world_phishing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import logging
import os
import tempfile

import click
from recipient_features_stage import RecipientFeaturesStage
Expand Down Expand Up @@ -77,7 +76,7 @@
@click.option("--server_url", default='localhost:8000', help="Tritonserver url.")
@click.option(
"--output_file",
default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"),
default="phishing_detections.jsonlines",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(use_stage_function: bool,
Expand Down
4 changes: 2 additions & 2 deletions examples/gnn_fraud_detection_pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Added stage: <serialize-8; SerializeStage(include=None, exclude=None, fixed_colu
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <monitor-9; MonitorStage(description=Serialize rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <to-file-10; WriteToFileStage(filename=output.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
Added stage: <to-file-10; WriteToFileStage(filename=gnn_fraud_detection_output.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Graph construction rate[Complete]: 265 messages [00:00, 1016.18 messages/s]
Expand Down Expand Up @@ -128,5 +128,5 @@ morpheus --log_level INFO \
gnn-fraud-classification --model_xgb_file examples/gnn_fraud_detection_pipeline/model/xgb.pt \
monitor --description "Add classification rate" \
serialize \
to-file --filename "output.csv" --overwrite
to-file --filename "gnn_fraud_detection_cli_output.csv" --overwrite
```
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
@click.option(
"--output_file",
type=click.Path(dir_okay=False),
default="output.csv",
default="gnn_fraud_detection_output.csv",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(num_threads,
Expand Down
2 changes: 1 addition & 1 deletion examples/log_parsing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ morpheus --log_level INFO \
monitor --description "Inference rate" --unit inf \
log-postprocess --vocab_path ./models/training-tuning-scripts/sid-models/resources/bert-base-cased-vocab.txt \
--model_config_path=./models/log-parsing-models/log-parsing-config-20220418.json \
to-file --filename ./log-parsing-output.jsonlines --overwrite \
to-file --filename ./log-parsing-cli-output.jsonlines --overwrite \
monitor --description "Postprocessing rate"
```
8 changes: 4 additions & 4 deletions examples/nlp_si_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ morpheus --log_level=DEBUG \
filter --filter_source=TENSOR \
`# 8th Stage: Convert from objects back into strings` \
serialize --exclude '^_ts_' \
`# 9th Stage: Write out the JSON lines to the detections.jsonlines file` \
to-file --filename=detections.jsonlines --overwrite
`# 9th Stage: Write out the JSON lines to the nlp_si_detections.jsonlines file` \
to-file --filename=nlp_si_detections.jsonlines --overwrite
```

If successful, the following should be displayed:
Expand Down Expand Up @@ -187,7 +187,7 @@ Added stage: <add-class-5; AddClassificationsStage(threshold=0.5, labels=[], pre
└─ morpheus.ControlMessage -> morpheus.ControlMessage
Added stage: <serialize-6; SerializeStage(include=[], exclude=['^_ts_'], fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-7; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
Added stage: <to-file-7; WriteToFileStage(filename=nlp_si_detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1656352480.541071
Expand All @@ -196,7 +196,7 @@ Inference Rate[Complete]: 93085inf [00:07, 12673.63inf/s]

```

The output file `detections.jsonlines` will contain the original PCAP messages with the following additional fields added:
The output file `nlp_si_detections.jsonlines` will contain the original PCAP messages with the following additional fields added:
* `address`
* `bank_acct`
* `credit_card`
Expand Down
47 changes: 35 additions & 12 deletions examples/ransomware_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
from stages.create_features import CreateFeaturesRWStage
from stages.preprocessing import PreprocessingRWStage

from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.input.appshield_source_stage import AppShieldSourceStage
Expand Down Expand Up @@ -61,6 +64,12 @@
type=click.IntRange(min=1),
help="Max batch size to use for the model.",
)
@click.option(
"--pipeline_batch_size",
default=1024,
type=click.IntRange(min=1),
help=("Internal batch size for the pipeline. Can be much larger than the model batch size."),
)
@click.option(
"--conf_file",
type=click.STRING,
Expand Down Expand Up @@ -98,18 +107,19 @@
default="./ransomware_detection_output.jsonlines",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(debug,
num_threads,
n_dask_workers,
threads_per_dask_worker,
model_max_batch_size,
conf_file,
model_name,
server_url,
sliding_window,
input_glob,
watch_directory,
output_file):
def run_pipeline(debug: bool,
num_threads: int,
n_dask_workers: int,
threads_per_dask_worker: int,
model_max_batch_size: int,
pipeline_batch_size: int,
conf_file: str,
model_name: str,
server_url: str,
sliding_window: int,
input_glob: str,
watch_directory: bool,
output_file: str):

if debug:
configure_logging(log_level=logging.DEBUG)
Expand All @@ -125,6 +135,7 @@ def run_pipeline(debug,
# Below properties are specified by the command line.
config.num_threads = num_threads
config.model_max_batch_size = model_max_batch_size
config.pipeline_batch_size = pipeline_batch_size
config.feature_length = snapshot_fea_length * sliding_window
config.class_labels = ["pred", "score"]

Expand Down Expand Up @@ -222,6 +233,18 @@ def run_pipeline(debug,
# This stage logs the metrics (msg/sec) from the above stage.
pipeline.add_stage(MonitorStage(config, description="Serialize rate"))

@stage(needed_columns={'timestamp_process': TypeId.STRING})
def concat_columns(msg: MessageMeta) -> MessageMeta:
"""
This stage concatinates the timestamp and pid_process columns to create a unique field.
"""
with msg.mutable_dataframe() as df:
df['timestamp_process'] = df['timestamp'] + df['pid_process']

return msg

pipeline.add_stage(concat_columns(config))

# Add a write file stage.
# This stage writes all messages to a file.
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))
Expand Down
3 changes: 1 addition & 2 deletions python/morpheus/morpheus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def freeze(self):
"""
self._check_cpp_mode(fix_mis_match=not self.frozen)
if not self.frozen:
self._validate_config()
self.frozen = True

def _check_cpp_mode(self, fix_mis_match: bool = False):
Expand Down Expand Up @@ -267,7 +268,6 @@ def pipeline_batch_size(self):
@pipeline_batch_size.setter
def pipeline_batch_size(self, value: int):
self._pipeline_batch_size = value
self._validate_config()

@property
def model_max_batch_size(self):
Expand All @@ -276,7 +276,6 @@ def model_max_batch_size(self):
@model_max_batch_size.setter
def model_max_batch_size(self, value: int):
self._model_max_batch_size = value
self._validate_config()

def _validate_config(self):
if self._pipeline_batch_size < self._model_max_batch_size:
Expand Down
5 changes: 3 additions & 2 deletions python/morpheus/morpheus/utils/compare_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def compare_df(df_a: pd.DataFrame,

total_rows = len(df_a_filtered)
diff_rows = len(df_a_filtered) - int(comparison.count_matching_rows())
diff_cols = len(extra_columns) + len(missing_columns)

if (comparison.matches()):
logger.info("Results match validation dataset")
Expand All @@ -141,7 +142,7 @@ def compare_df(df_a: pd.DataFrame,

mismatch_df = merged.loc[mismatched_idx]

if diff_rows > 0:
if diff_rows > 0 or diff_cols > 0:
logger.debug("Results do not match. Diff %d/%d (%f %%). First 10 mismatched rows:",
diff_rows,
total_rows,
Expand All @@ -160,5 +161,5 @@ def compare_df(df_a: pd.DataFrame,
"matching_cols": list(same_columns),
"extra_cols": list(extra_columns),
"missing_cols": list(missing_columns),
"diff_cols": len(extra_columns) + len(missing_columns)
"diff_cols": diff_cols
}
2 changes: 1 addition & 1 deletion scripts/compare_data_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def main():
abs_tol=args.abs_tol,
rel_tol=args.rel_tol)

if results['diff_rows'] > 0:
if results['diff_rows'] > 0 or results['diff_cols'] > 0:
sys.exit(1)


Expand Down
2 changes: 2 additions & 0 deletions tests/morpheus/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_warning_model_batch_size_less_than_pipeline_batch_size(caplog: pytest.L
config.pipeline_batch_size = 256
with caplog.at_level(logging.WARNING):
config.model_max_batch_size = 257
config.freeze()
assert len(caplog.records) == 1
import re
assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None
Expand All @@ -169,6 +170,7 @@ def test_warning_pipeline_batch_size_less_than_model_batch_size(caplog: pytest.L
config.model_max_batch_size = 8
with caplog.at_level(logging.WARNING):
config.pipeline_batch_size = 7
config.freeze()
assert len(caplog.records) == 1
import re
assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None
Loading