Skip to content

Commit

Permalink
Update DFP paths
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Nov 8, 2024
1 parent c77ce83 commit 91700f9
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Subclass of `DateTimeColumn`, counts the unique occurrences of a value in `group
![Input Stages](img/dfp_input_config.png)

#### Source Stage (`MultiFileSource`)
The `MultiFileSource` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py`) receives a path or list of paths (`filenames`), and will collectively be emitted into the pipeline as an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) object. The paths may include wildcards `*` as well as URLs (ex: `s3://path`) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files). In addition to this paths can be cached locally by prefixing them with `filecache::` (ex: `filecache::s3://bucket-name/key-name`).
The `MultiFileSource` (`python/morpheus/morpheus/modules/input/multi_file_source.py`) receives a path or list of paths (`filenames`), and will collectively be emitted into the pipeline as an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) object. The paths may include wildcards `*` as well as URLs (ex: `s3://path`) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files). In addition to this paths can be cached locally by prefixing them with `filecache::` (ex: `filecache::s3://bucket-name/key-name`).

> **Note:** This stage does not actually download the data files, allowing the file list to be filtered and batched prior to being downloaded.
Expand All @@ -187,7 +187,7 @@ The `MultiFileSource` (`examples/digital_fingerprinting/production/morpheus/dfp/


#### File Batcher Stage (`DFPFileBatcherStage`)
The `DFPFileBatcherStage` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py`) groups data in the incoming `DataFrame` in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, `AUTH_LOG-2022-08-21T22.05.23Z.json`), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied `date_conversion_func` function (more on this later).
The `DFPFileBatcherStage` (`python/morpheus_dfp/morpheus_dfp/stages/dfp_file_batcher_stage.py`) groups data in the incoming `DataFrame` in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, `AUTH_LOG-2022-08-21T22.05.23Z.json`), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied `date_conversion_func` function (more on this later).

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand Down Expand Up @@ -219,7 +219,7 @@ pipeline.add_stage(
> **Note:** If `date_conversion_func` returns time-zone aware timestamps, then `start_time` and `end_time` if not `None` need to also be timezone aware `datetime` objects.
#### File to DataFrame Stage (`DFPFileToDataFrameStage`)
The `DFPFileToDataFrameStage` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py`) stage receives a `list` of an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) and loads them into a single `DataFrame` which is then emitted into the pipeline. When the parent stage is `DFPFileBatcherStage` each batch (typically one day) is concatenated into a single `DataFrame`. If the parent was `MultiFileSource` the entire dataset is loaded into a single `DataFrame`. Because of this, it is important to choose a `period` argument for `DFPFileBatcherStage` small enough such that each batch can fit into memory.
The `DFPFileToDataFrameStage` (`python/morpheus_dfp/morpheus_dfp/stages/dfp_file_to_df.py`) stage receives a `list` of an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) and loads them into a single `DataFrame` which is then emitted into the pipeline. When the parent stage is `DFPFileBatcherStage` each batch (typically one day) is concatenated into a single `DataFrame`. If the parent was `MultiFileSource` the entire dataset is loaded into a single `DataFrame`. Because of this, it is important to choose a `period` argument for `DFPFileBatcherStage` small enough such that each batch can fit into memory.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand Down Expand Up @@ -251,7 +251,7 @@ This final stage will write all received messages to a single output file in eit
| `overwrite` | `bool` | Optional, defaults to `False`. If the file specified in `filename` already exists, it will be overwritten if this option is set to `True` |

#### Write to S3 Stage (`WriteToS3Stage`)
The {py:obj}`~dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resulting anomaly detections to S3. The `WriteToS3Stage` decouples the S3 specific operations from the Morpheus stage, and as such receives an `s3_writer` argument.
The {py:obj}`~morpheus_dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resulting anomaly detections to S3. The `WriteToS3Stage` decouples the S3 specific operations from the Morpheus stage, and as such receives an `s3_writer` argument.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -262,7 +262,7 @@ The {py:obj}`~dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resu
These stages are common to both the training and inference pipelines, unlike the input and output stages these are specific to the DFP pipeline and intended to be configured but not replaceable.

#### Split Users Stage (`DFPSplitUsersStage`)
The {py:obj}`~dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receives an incoming `DataFrame` and emits a `list` of `DFPMessageMeta` where each `DFPMessageMeta` represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis.
The {py:obj}`~morpheus_dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receives an incoming `DataFrame` and emits a `list` of `DFPMessageMeta` where each `DFPMessageMeta` represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -273,7 +273,7 @@ The {py:obj}`~dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receive
| `only_users` | `List[str]` or `None` | Limit records to a specific list of users, when `include_generic` is `True` the generic user's records will also be limited to the users in this list. Mutually exclusive with `skip_users`. |

#### Rolling Window Stage (`DFPRollingWindowStage`)
The {py:obj}`~dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage performs several key pieces of functionality for DFP.
The {py:obj}`~morpheus_dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage performs several key pieces of functionality for DFP.
<!-- Work-around for https://github.com/errata-ai/vale/issues/874 -->
<!-- vale off -->
1. This stage keeps a moving window of logs on a per user basis
Expand All @@ -299,7 +299,7 @@ The {py:obj}`~dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage p
> **Note:** this stage computes a row hash for the first and last rows of the incoming `DataFrame` as such all data contained must be hashable, any non-hashable values such as `lists` should be dropped or converted into hashable types in the `DFPFileToDataFrameStage`.
#### Preprocessing Stage (`DFPPreprocessingStage`)
The {py:obj}`~dfp.stages.dfp_preprocessing_stage.DFPPreprocessingStage` stage, the actual logic of preprocessing is defined in the `input_schema` argument. Since this stage occurs in the pipeline after the `DFPFileBatcherStage` and `DFPSplitUsersStage` stages all records in the incoming `DataFrame` correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the `logcount` and `locincrement` features mentioned above. Making the type of processing performed in this stage different from those performed in the `DFPFileToDataFrameStage`.
The {py:obj}`~morpheus_dfp.stages.dfp_preprocessing_stage.DFPPreprocessingStage` stage, the actual logic of preprocessing is defined in the `input_schema` argument. Since this stage occurs in the pipeline after the `DFPFileBatcherStage` and `DFPSplitUsersStage` stages all records in the incoming `DataFrame` correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the `logcount` and `locincrement` features mentioned above. Making the type of processing performed in this stage different from those performed in the `DFPFileToDataFrameStage`.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -316,7 +316,7 @@ After training the generic model, individual user models can be trained. Individ
### Training Stages

#### Training Stage (`DFPTraining`)
The {py:obj}`~dfp.stages.dfp_training.DFPTraining` trains a model for each incoming `DataFrame` and emits an instance of `morpheus.messages.ControlMessage` containing the trained model.
The {py:obj}`~morpheus_dfp.stages.dfp_training.DFPTraining` trains a model for each incoming `DataFrame` and emits an instance of `morpheus.messages.ControlMessage` containing the trained model.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -326,7 +326,7 @@ The {py:obj}`~dfp.stages.dfp_training.DFPTraining` trains a model for each incom
| `validation_size` | `float` | Proportion of the input dataset to use for training validation. Should be between 0.0 and 1.0. Default is 0.0.|

#### MLflow Model Writer Stage (`DFPMLFlowModelWriterStage`)
The {py:obj}`~dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records).
The {py:obj}`~morpheus_dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records).

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -343,7 +343,7 @@ The {py:obj}`~dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stag
### Inference Stages

#### Inference Stage (`DFPInferenceStage`)
The {py:obj}`~dfp.stages.dfp_inference_stage.DFPInferenceStage` stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original `DataFrame` along with new columns containing the z score (`mean_abs_z`), as well as the name and version of the model that generated that score (`model_version`). For each feature in the model, three additional columns will also be added:
The {py:obj}`~morpheus_dfp.stages.dfp_inference_stage.DFPInferenceStage` stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original `DataFrame` along with new columns containing the z score (`mean_abs_z`), as well as the name and version of the model that generated that score (`model_version`). For each feature in the model, three additional columns will also be added:
* `<feature name>_loss` : The loss
* `<feature name>_z_loss` : The loss z-score
* `<feature name>_pred` : The predicted value
Expand All @@ -370,4 +370,4 @@ The {py:obj}`~morpheus.stages.postprocess.filter_detections_stage.FilterDetectio
| `field_name` | `str` | `probs` | Name of the tensor (`filter_source=FilterSource.TENSOR`) or DataFrame column (`filter_source=FilterSource.DATAFRAME`) to use as the filter criteria. |

#### Post Processing Stage (`DFPPostprocessingStage`)
The {py:obj}`~dfp.stages.dfp_postprocessing_stage.DFPPostprocessingStage` stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`.
The {py:obj}`~morpheus_dfp.stages.dfp_postprocessing_stage.DFPPostprocessingStage` stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`.

0 comments on commit 91700f9

Please sign in to comment.