The chaining parser reads the raw text of an event, extracts fields into a consistent schema and constructs a cyber Message. The cyber Message is passed on to the enrichment and triaging phase of the pipeline.
If chain.enrichments.file is set, the parser converts the message fields to enrichment keys and values and writes the enrichments to HBase. See EnrichmentsConfiguration.
If original.enabled is true, the parser aggregates the original message content along with the original topic, partition and offset in Parquet files. The parser writes the Parquet files to the path specified in original.basepath. The basepath supports HDFS, Ozone or a cloud store.
If signature.enabled is true, the parser signs each original message content and includes the signature in the structured message.
This feature is compute intensive and only recommended for low events per second deployments with anti-tamper requirements.
The parser chain configuration defines how to convert a textual message to the fields of a structured cyber Message.
For example, the chain below reads a netflow message in json format, renames the @timestamp json element to timestamp, and converts the string timestamp to epoch millis.
Every message must produce a field called "timestamp" set to a long value of epoch milliseconds. The timestamp field populates the ts field of cyber Message.
{
"netflow": {
"id": "220ee8c5-07d7-48d9-8df5-7d23376cb664",
"name": "Netflow Parser",
"parsers": [
{
"id": "f812c6dc-40cc-4c77-abf8-e15fccdfea32",
"name": "Netflow as JSON",
"type": "com.cloudera.parserchains.parsers.JSONParser",
"config": {
"input": {
"input": "original_string"
},
"norm": {
"norm": "UNFOLD_NESTED"
}
}
},
{
"id": "6b8797a2-95df-4021-83c2-60ac4c786e67",
"name": "Field Renamer",
"type": "com.cloudera.parserchains.parsers.RenameFieldParser",
"config": {
"fieldToRename": [
{
"from": "@timestamp",
"to": "timestamp"
}
]
}
},
{
"id": "9549004f-83e4-4d24-8baa-abdbdad06e61",
"name": "Timestamp Parser",
"type": "com.cloudera.parserchains.parsers.TimestampFormatParser",
"config": {
"fields": [
{
"field": "timestamp",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"tz": "UTC"
}
]
}
}
]
}
}
The topic map defines which topics the parser will consume and how the raw data will be converted to structured messages.
The topic map determines which chain to use after reading a message from a topic. The source field specifies the source of the resulting parsed cyber Message.
The topic map can optionally define the broker hosting the topic. If the broker is not specified, the topic is read from the default kafka broker.
The example topic config below uses the netflow_type1 chain for all messages with topics starting with netflow_type1 and uses the netflow_type2 chain for all messages with topics starting with netflow_type2. Both parser chains result in a message with source type netflow.
{
"netflow_type1_.*" : {
"chainKey": "netflow_type1",
"source" : "netflow"
},
"netflow_type2_.*" : {
"chainKey": "netflow_type2",
"source" : "netflow"
}
}
When a parser fails, the parser publishes a message to the parser.error topic with a single field containing the original text and a data quality error indicating the problem.
If the parser is extracting streaming enrichments, the parser will send a message to the error topic if an enrichment key field is missing or no enrichment values can be extracted from the message.
In production, monitor the parser.error topic. Determine the cause of the failures and correct the chain configuration. Then replay the failed messages.
Property Name | Type | Description | Required/Default | Example |
---|---|---|---|---|
chain.file | File path | Json defining the parser chains and their tasks. See parser chain configuration for details | Required | parser-chain.json |
chain | String | Alternate to chain.file for test cases. Specify the parser chain configuration inline rather than a separate file. | Optional | parser chain json content |
chain.topic.map.file | File path | Json defining the mapping between topics and the parser chains used to parser the raw messages. | Required | topic-map.json |
chain.topic.map | String | Alternative to chain.topic.map.file. | Optional | {"squid.*" : {"chainKey": "squid", "source" : "proxy"}} |
chain.enrichments.file | File path | Defines EnrichmentsConfigFile mapping from messages to enrichment key and values. Defines storage format for enrichment. | Optional - if not specified, no enrichments extracted from messages. | enrichment-storage.json |
topic.output | topic name | Outgoing parsed and structured messages. Stored in AVRO message format managed by schema registry. | required | triage.input |
original.enabled | Boolean | If true, archive the original message text. | true by default | false |
original.basepath | File path | Archive the original messages partitions at this location in Parquet format. | required when original.enabled = true | hdfs:/path/to/original |
signature.enabled | Boolean | Determines if original message content will be signed. If enabled, parser performs at much lower events per second. | defaults to true | false |
key.private.file | File path | Path to private key .der file used to sign original messages. | required if signature.enabled=true | private-key.der |
key.private.base64 | String | For testing. Base64 encoding of private key. | optional | |
parallelism | integer | Number of parallel tasks to run. | default=2 | 2 |
checkpoint.interval.ms | integer | Milliseconds between Flink state checkpoints | default=60000 | 10000 |
schema.registry.url | url | Schema registry rest endpoint url | required | http://myregistryhost:7788/api/v1 |
kafka.bootstrap.servers | comma separated list | Kafka bootstrap server names and ports. | required | brokerhost1:9092,brokerhost2:9092 |
kafka.setting name | Kafka setting | Settings for Kafka producers or Kafka consumer. | set as required by security and performance | |
<broker>.kafka.bootstrap.servers | comma separated list | Kafka consumer for non-default broker referenced in TopicMap. | required if topic map references non-default broker | network_broker |
<broker>.kafka.setting name | Kafka setting | Settings for Kafka consumer for non-default broker referenced in TopicMap. | set as required by security and performance | |
flink.job.name | String | Set the Flink job name | defaults to Flink Parser | proxy parser |
kafka.bootstrap.servers=kafka.bootstrap.servers=<bootstrap>
kafka.client.id=parser-chain
kafka.group.id=parser-chain
kafka.auto.offset.reset=latest
# chain and topic map configurations - see above
chain.file=/path/to/chain_config.json
chain.topic.map.file=/path/to/topic_config.json
# publish parsed messages to topic.output
topic.output=enrichment.input
# messages that fail to parse are published to topic.error
topic.error=parser.error
# Write the original string content to HDFS parquet files at the given location
original.enabled=true
original.basepath=/data/original/
# Disable signing for better performance
signature.enabled=false
schema.registry.url=https://schemareghost:7790/api/v1
schema.registry.client.ssl.trustStorePath=truststore.jks
schema.registry.client.ssl.trustStorePassword=truststorepassword
The parser chain configuration file is a map from chain name to chain definition. Each chain definition specifies the ordered set of steps to convert raw log messages into structured messages.
Json Field | Type | Description | Required/Default | Example |
---|---|---|---|---|
chain name | string | Name of the parser chain. Must be unique among all chains defined in context. Refer to the chain by this name in the topic map. | required | netflow |
id | string | Identifier of the parser. Can be UUID or unique number. | required | 1 or 220ee8c5-07d7-48d9-8df5-7d23376cb664 |
name | string | Human readable name of the parser. | required | Netflow Parser |
parsers | array of parser tasks | Ordered list of parser tasks to convert a raw log to a structured message. | required | See sample chain |
Json Field | Type | Description | Required/Default | Example |
---|---|---|---|---|
id | string | Identifier of the parser. Can be UUID or unique number. | required | 1 or 220ee8c5-07d7-48d9-8df5-7d23376cb664 |
name | string | Human readable name of the parser. | required | Netflow Parser |
type | parser class name | Parser task class name. | required | com.cloudera.parserchains.parsers.JSONParser |
config | parser specific configuration | Configuration for parser. Definitions vary based on parser type. | required | See sample chain |
The topic map defines which topics the parser consumes and how the messages in the topic are converted to structured messages.
Json Map | Type | Description | Required/Default | Example |
---|---|---|---|---|
topic pattern key | Java regex | Pattern specifying the topic or topics containing raw messages | Required | netflow.* |
chainKey | String | Name of the chain key used to convert the raw messages to structured messages. | Required | squid |
source | String | The name of the source for messages produced by this mapping. | Required | proxy |
broker | String | The name of the broker hosting the topics. Define the broker connection parameters using .kafka. | Default - use same broker as parser | externalBroker |
Below are example openssl commands to create the private key for signing the messages. Consult the cryptographic requirements of your organization before creating these files.
openssl genrsa -out private_key.pem 2048
openssl pkcs8 -topk8 -inform PEM -outform DER -in private_key.pem -out private_key.der -nocrypt
openssl rsa -in private_key.pem -pubout -outform DER -out public_key.der
The parser reports the following messages.
Severity Level | Feature | Message | Meaning |
---|---|---|---|
ERROR | chain_parser | Message does not contain a timestamp field. | The parsed message does not contain a field named timestamp |
ERROR | chain_parser | Timestamp is not in epoch milliseconds or seconds. | The message has a field named timestamp but it is either not numeric or is not in epoch milliseconds or seconds |
ERROR | stream_enrich | Message does not define values for key field for enrichment type '<enrichment_type>'" | An enrichment could not be extracted from the parsed message because the message did not contain one of the required key fields |
ERROR | stream_enrich | Message does not contain any values for enrichment type '<enrichment_type>' | There were no values to write for the enrichment. Either the message did not contain extensions or the message did not specify any of the value fields. |
flink run --jobmanager yarn-cluster -yjm 1024 -ytm 1024 --detached --yarnname "Parser" parser-chains-flink-0.0.1-SNAPSHOT.jar parser.properties