Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc cleanups to example pipelines #2049

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ To explicitly set the output format we could specify the `file_type` argument to
```python
import logging
import os
import tempfile

import click

Expand Down Expand Up @@ -542,7 +541,7 @@ MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT']
@click.option("--server_url", default='localhost:8000', help="Tritonserver url.")
@click.option(
"--output_file",
default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"),
default=".tmp/output/phishing_detections.jsonlines",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(use_stage_function: bool,
Expand Down Expand Up @@ -633,7 +632,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis
monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \
add-scores --label=is_phishing \
serialize \
to-file --filename=/tmp/detections.jsonlines --overwrite
to-file --filename=.tmp/output/phishing_detections_cli.jsonlines --overwrite
```

## Stage Constructors
Expand Down
6 changes: 3 additions & 3 deletions examples/abp_nvsmi_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ morpheus --log_level=DEBUG \
`# 7th Stage: Convert from objects back into strings. Ignore verbose input data` \
serialize --include 'mining' \
`# 8th Stage: Write out the JSON lines to the detections.jsonlines file` \
to-file --filename=detections.jsonlines --overwrite
to-file --filename=.tmp/output/abp_nvsmi_detections.jsonlines --overwrite
```

If successful, the following should be displayed:
Expand Down Expand Up @@ -217,15 +217,15 @@ Added stage: <add-class-5; AddClassificationsStage(threshold=0.5, labels=[], pre
└─ morpheus.ControlMessage -> morpheus.ControlMessage
Added stage: <serialize-6; SerializeStage(include=['mining'], exclude=['^ID$', '^_ts_'], fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-7; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
Added stage: <to-file-7; WriteToFileStage(filename=.tmp/output/abp_nvsmi_detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1656353254.9919598
Inference Rate[Complete]: 1242inf [00:00, 1863.04inf/s]
====Pipeline Complete====
```

The output file `detections.jsonlines` will contain a single boolean value for each input line. At some point the values will switch from `0` to `1`:
The output file `.tmp/output/detections.jsonlines` will contain a single boolean value for each input line. At some point the values will switch from `0` to `1`:

```
...
Expand Down
4 changes: 2 additions & 2 deletions examples/abp_pcap_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ python examples/abp_pcap_detection/run.py
```
Note: Both Morpheus and Triton Inference Server containers must have access to the same GPUs in order for this example to work.

The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `pcap_out.jsonlines`.
The pipeline will process the input `abp_pcap_dump.jsonlines` sample data and write it to `.tmp/output/pcap_out.jsonlines`.

### CLI Example
The above example is illustrative of using the Python API to build a custom Morpheus Pipeline.
Expand All @@ -118,6 +118,6 @@ morpheus --log_level INFO --plugin "examples/abp_pcap_detection/abp_pcap_preproc
monitor --description "Add classification rate" --unit "add-class" \
serialize \
monitor --description "Serialize rate" --unit ser \
to-file --filename "pcap_out.jsonlines" --overwrite \
to-file --filename ".tmp/output/pcap_out_cli.jsonlines" --overwrite \
monitor --description "Write to file rate" --unit "to-file"
```
2 changes: 1 addition & 1 deletion examples/abp_pcap_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
)
@click.option(
"--output_file",
default="./pcap_out.jsonlines",
default="./.tmp/output/pcap_out.jsonlines",
help="The path to the file where the inference output will be saved.",
)
@click.option(
Expand Down
4 changes: 2 additions & 2 deletions examples/cpu_only/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Options:
To launch the configured Morpheus pipeline with the sample data that is provided in `examples/data`, run the following:

```bash
python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/out.jsonlines
python examples/cpu_only/run.py --use_cpu_only --in_file=examples/data/email.jsonlines --out_file=.tmp/output/cpu_only_out.jsonlines
```

### CLI Example
Expand All @@ -68,5 +68,5 @@ morpheus --log_level INFO \
deserialize \
monitor --description "deserialize" \
serialize \
to-file --filename=.tmp/out.jsonlines --overwrite
to-file --filename=.tmp/output/cpu_only_cli_out.jsonlines --overwrite
```
2 changes: 1 addition & 1 deletion examples/cpu_only/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"--out_file",
help="Output file",
type=click.Path(dir_okay=False),
default="output.csv",
default=".tmp/output/cpu_only_out.csv",
required=True,
)
def run_pipeline(log_level: int, use_cpu_only: bool, in_file: pathlib.Path, out_file: pathlib.Path):
Expand Down
3 changes: 1 addition & 2 deletions examples/developer_guide/2_1_real_world_phishing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import logging
import os
import tempfile

import click
from recipient_features_stage import RecipientFeaturesStage
Expand Down Expand Up @@ -77,7 +76,7 @@
@click.option("--server_url", default='localhost:8000', help="Tritonserver url.")
@click.option(
"--output_file",
default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"),
default=".tmp/output/phishing_detections.jsonlines",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(use_stage_function: bool,
Expand Down
4 changes: 2 additions & 2 deletions examples/gnn_fraud_detection_pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Added stage: <serialize-8; SerializeStage(include=None, exclude=None, fixed_colu
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <monitor-9; MonitorStage(description=Serialize rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <to-file-10; WriteToFileStage(filename=output.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
Added stage: <to-file-10; WriteToFileStage(filename=.tmp/output/gnn_fraud_detection_output.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Graph construction rate[Complete]: 265 messages [00:00, 1016.18 messages/s]
Expand Down Expand Up @@ -128,5 +128,5 @@ morpheus --log_level INFO \
gnn-fraud-classification --model_xgb_file examples/gnn_fraud_detection_pipeline/model/xgb.pt \
monitor --description "Add classification rate" \
serialize \
to-file --filename "output.csv" --overwrite
to-file --filename "gnn_fraud_detection_cli_output.csv" --overwrite
```
2 changes: 1 addition & 1 deletion examples/gnn_fraud_detection_pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
@click.option(
"--output_file",
type=click.Path(dir_okay=False),
default="output.csv",
default=".tmp/output/gnn_fraud_detection_output.csv",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(num_threads,
Expand Down
2 changes: 1 addition & 1 deletion examples/log_parsing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ morpheus --log_level INFO \
monitor --description "Inference rate" --unit inf \
log-postprocess --vocab_path ./models/training-tuning-scripts/sid-models/resources/bert-base-cased-vocab.txt \
--model_config_path=./models/log-parsing-models/log-parsing-config-20220418.json \
to-file --filename ./log-parsing-output.jsonlines --overwrite \
to-file --filename .tmp/output/log-parsing-cli-output.jsonlines --overwrite \
monitor --description "Postprocessing rate"
```
2 changes: 1 addition & 1 deletion examples/log_parsing/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
)
@click.option(
"--output_file",
default="log-parsing-output.jsonlines",
default=".tmp/output/log-parsing-output.jsonlines",
help="The path to the file where the inference output will be saved.",
)
@click.option('--model_vocab_hash_file',
Expand Down
8 changes: 4 additions & 4 deletions examples/nlp_si_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ morpheus --log_level=DEBUG \
filter --filter_source=TENSOR \
`# 8th Stage: Convert from objects back into strings` \
serialize --exclude '^_ts_' \
`# 9th Stage: Write out the JSON lines to the detections.jsonlines file` \
to-file --filename=detections.jsonlines --overwrite
`# 9th Stage: Write out the JSON lines to the nlp_si_detections.jsonlines file` \
to-file --filename=.tmp/output/nlp_si_detections.jsonlines --overwrite
```

If successful, the following should be displayed:
Expand Down Expand Up @@ -187,7 +187,7 @@ Added stage: <add-class-5; AddClassificationsStage(threshold=0.5, labels=[], pre
└─ morpheus.ControlMessage -> morpheus.ControlMessage
Added stage: <serialize-6; SerializeStage(include=[], exclude=['^_ts_'], fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-7; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
Added stage: <to-file-7; WriteToFileStage(filename=.tmp/output/nlp_si_detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1656352480.541071
Expand All @@ -196,7 +196,7 @@ Inference Rate[Complete]: 93085inf [00:07, 12673.63inf/s]

```

The output file `detections.jsonlines` will contain the original PCAP messages with the following additional fields added:
The output file `.tmp/output/nlp_si_detections.jsonlines` will contain the original PCAP messages with the following additional fields added:
* `address`
* `bank_acct`
* `credit_card`
Expand Down
2 changes: 1 addition & 1 deletion examples/nlp_si_detection/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ morpheus --log_level=DEBUG \
add-class \
filter --filter_source=TENSOR \
serialize --exclude '^_ts_' \
to-file --filename=detections.jsonlines --overwrite
to-file --filename=.tmp/output/nlp_si_detections.jsonlines --overwrite
2 changes: 1 addition & 1 deletion examples/ransomware_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ python examples/ransomware_detection/run.py --server_url=localhost:8000 \
--sliding_window=3 \
--model_name=ransomw-model-short-rf \
--input_glob=./examples/data/appshield/*/snapshot-*/*.json \
--output_file=./ransomware_detection_output.jsonlines
--output_file=.tmp/output/ransomware_detection_output.jsonlines
```

Input features for a short model can be taken from every three snapshots sequence, such as (1, 2, 3), or (2, 3, 4). The sliding window represents the number of subsequent snapshots that need to be taken into consideration when generating the input for a model. Sliding window for the medium model is `5` and for the long model it is `10`.
Expand Down
49 changes: 36 additions & 13 deletions examples/ransomware_detection/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
from stages.create_features import CreateFeaturesRWStage
from stages.preprocessing import PreprocessingRWStage

from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.input.appshield_source_stage import AppShieldSourceStage
Expand Down Expand Up @@ -61,6 +64,12 @@
type=click.IntRange(min=1),
help="Max batch size to use for the model.",
)
@click.option(
"--pipeline_batch_size",
default=1024,
type=click.IntRange(min=1),
help=("Internal batch size for the pipeline. Can be much larger than the model batch size."),
)
@click.option(
"--conf_file",
type=click.STRING,
Expand Down Expand Up @@ -95,21 +104,22 @@
@click.option(
"--output_file",
type=click.STRING,
default="./ransomware_detection_output.jsonlines",
default=".tmp/output/ransomware_detection_output.jsonlines",
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(debug,
num_threads,
n_dask_workers,
threads_per_dask_worker,
model_max_batch_size,
conf_file,
model_name,
server_url,
sliding_window,
input_glob,
watch_directory,
output_file):
def run_pipeline(debug: bool,
num_threads: int,
n_dask_workers: int,
threads_per_dask_worker: int,
model_max_batch_size: int,
pipeline_batch_size: int,
conf_file: str,
model_name: str,
server_url: str,
sliding_window: int,
input_glob: str,
watch_directory: bool,
output_file: str):

if debug:
configure_logging(log_level=logging.DEBUG)
Expand All @@ -125,6 +135,7 @@ def run_pipeline(debug,
# Below properties are specified by the command line.
config.num_threads = num_threads
config.model_max_batch_size = model_max_batch_size
config.pipeline_batch_size = pipeline_batch_size
config.feature_length = snapshot_fea_length * sliding_window
config.class_labels = ["pred", "score"]

Expand Down Expand Up @@ -222,6 +233,18 @@ def run_pipeline(debug,
# This stage logs the metrics (msg/sec) from the above stage.
pipeline.add_stage(MonitorStage(config, description="Serialize rate"))

@stage(needed_columns={'timestamp_process': TypeId.STRING})
def concat_columns(msg: MessageMeta) -> MessageMeta:
"""
This stage concatinates the timestamp and pid_process columns to create a unique field.
"""
with msg.mutable_dataframe() as df:
df['timestamp_process'] = df['timestamp'] + df['pid_process']

return msg

pipeline.add_stage(concat_columns(config))

# Add a write file stage.
# This stage writes all messages to a file.
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))
Expand Down
6 changes: 3 additions & 3 deletions examples/root_cause_analysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ add-scores --label=is_root_cause \
`# 7th Stage: Convert from objects back into strings` \
serialize --exclude '^ts_' \
`# 8th Stage: Write results out to CSV file` \
to-file --filename=./root-cause-binary-output.jsonlines --overwrite
to-file --filename=.tmp/output/root-cause-binary-output.jsonlines --overwrite
```

If successful, the following should be displayed:
Expand Down Expand Up @@ -177,10 +177,10 @@ Added stage: <add-scores-5; AddScoresStage(labels=('is_root_cause',), prefix=)>
└─ morpheus.ControlMessagee -> morpheus.ControlMessage
Added stage: <serialize-6; SerializeStage(include=(), exclude=('^ts_',), fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-7; WriteToFileStage(filename=./root-cause-binary-output.jsonlines, overwrite=True, file_type=FileTypes.Auto, include_index_col=True)>
Added stage: <to-file-7; WriteToFileStage(filename=.tmp/output/root-cause-binary-output.jsonlines, overwrite=True, file_type=FileTypes.Auto, include_index_col=True)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Inference rate[Complete]: 473 inf [00:01, 340.43 inf/s]
====Pipeline Complete====
```

The output file `root-cause-binary-output.jsonlines` will contain the original kernel log messages with an additional field `is_root_cause`. The value of the new field will be the root cause probability.
The output file `.tmp/output/root-cause-binary-output.jsonlines` will contain the original kernel log messages with an additional field `is_root_cause`. The value of the new field will be the root cause probability.
3 changes: 1 addition & 2 deletions python/morpheus/morpheus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def freeze(self):
"""
self._check_cpp_mode(fix_mis_match=not self.frozen)
if not self.frozen:
self._validate_config()
self.frozen = True

def _check_cpp_mode(self, fix_mis_match: bool = False):
Expand Down Expand Up @@ -267,7 +268,6 @@ def pipeline_batch_size(self):
@pipeline_batch_size.setter
def pipeline_batch_size(self, value: int):
self._pipeline_batch_size = value
self._validate_config()

@property
def model_max_batch_size(self):
Expand All @@ -276,7 +276,6 @@ def model_max_batch_size(self):
@model_max_batch_size.setter
def model_max_batch_size(self, value: int):
self._model_max_batch_size = value
self._validate_config()

def _validate_config(self):
if self._pipeline_batch_size < self._model_max_batch_size:
Expand Down
2 changes: 2 additions & 0 deletions tests/morpheus/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_warning_model_batch_size_less_than_pipeline_batch_size(caplog: pytest.L
config.pipeline_batch_size = 256
with caplog.at_level(logging.WARNING):
config.model_max_batch_size = 257
config.freeze()
assert len(caplog.records) == 1
import re
assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None
Expand All @@ -169,6 +170,7 @@ def test_warning_pipeline_batch_size_less_than_model_batch_size(caplog: pytest.L
config.model_max_batch_size = 8
with caplog.at_level(logging.WARNING):
config.pipeline_batch_size = 7
config.freeze()
assert len(caplog.records) == 1
import re
assert re.match(".*pipeline_batch_size < model_max_batch_size.*", caplog.records[0].message) is not None
Loading