Skip to content

Commit

Permalink
Add GELF support (#532)
Browse files Browse the repository at this point in the history
* Implement the ability to log to a GELF server/input, via the use of pygelf.

* Fix flake8 style checks.
  • Loading branch information
lingfish authored Aug 24, 2024
1 parent efe4893 commit 11e0461
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 2 deletions.
9 changes: 9 additions & 0 deletions docs/source/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ path = parsedmarc
[syslog]
server = localhost
port = 514

[gelf]
host = logger
port = 12201
mode = tcp
```

The full set of configuration options are:
Expand Down Expand Up @@ -345,6 +350,10 @@ The full set of configuration options are:
:::{note}
Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
:::
- `gelf`
- `host` - str: The GELF server name or IP address
- `port` - int: The port to use
- `mode` - str: The GELF transport type to use. Valid modes: `tcp`, `udp`, `tls`
:::{warning}
It is **strongly recommended** to **not** use the `nameservers`
Expand Down
56 changes: 54 additions & 2 deletions parsedmarc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \
parse_report_file, get_dmarc_reports_from_mbox, elastic, opensearch, \
kafkaclient, splunk, save_output, email_results, ParserError, \
__version__, InvalidDMARCReport, s3, syslog, loganalytics
__version__, InvalidDMARCReport, s3, syslog, loganalytics, gelf
from parsedmarc.mail import IMAPConnection, MSGraphConnection, GmailConnection
from parsedmarc.mail.graph import AuthMethod

Expand Down Expand Up @@ -147,6 +147,12 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

try:
if opts.gelf_host:
gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

if opts.hec:
try:
aggregate_reports_ = reports_["aggregate_reports"]
Expand Down Expand Up @@ -216,6 +222,12 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

try:
if opts.gelf_host:
gelf_client.save_forensic_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

if opts.hec:
try:
forensic_reports_ = reports_["forensic_reports"]
Expand Down Expand Up @@ -285,6 +297,12 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

try:
if opts.gelf_host:
gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

if opts.hec:
try:
smtp_tls_reports_ = reports_["smtp_tls_reports"]
Expand Down Expand Up @@ -499,7 +517,10 @@ def process_reports(reports_):
la_dcr_immutable_id=None,
la_dcr_aggregate_stream=None,
la_dcr_forensic_stream=None,
la_dcr_smtp_tls_stream=None
la_dcr_smtp_tls_stream=None,
gelf_host=None,
gelf_port=None,
gelf_mode=None,
)
args = arg_parser.parse_args()

Expand Down Expand Up @@ -1006,6 +1027,27 @@ def process_reports(reports_):
opts.la_dcr_smtp_tls_stream = \
log_analytics_config.get("dcr_smtp_tls_stream")

if "gelf" in config.sections():
gelf_config = config["gelf"]
if "host" in gelf_config:
opts.gelf_host = gelf_config["host"]
else:
logger.critical("host setting missing from the "
"gelf config section")
exit(-1)
if "port" in gelf_config:
opts.gelf_port = gelf_config["port"]
else:
logger.critical("port setting missing from the "
"gelf config section")
exit(-1)
if "mode" in gelf_config:
opts.gelf_mode = gelf_config["mode"]
else:
logger.critical("mode setting missing from the "
"gelf config section")
exit(-1)

logger.setLevel(logging.ERROR)

if opts.warnings:
Expand Down Expand Up @@ -1162,6 +1204,16 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))

if opts.gelf_host:
try:
gelf_client = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_forensic_topic = opts.kafka_forensic_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
Expand Down
64 changes: 64 additions & 0 deletions parsedmarc/gelf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-

import logging
import logging.handlers
import json
import threading

from parsedmarc import parsed_aggregate_reports_to_csv_rows, \
parsed_forensic_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler


log_context_data = threading.local()


class ContextFilter(logging.Filter):

def filter(self, record):
record.parsedmarc = log_context_data.parsedmarc
return True


class GelfClient(object):
"""A client for the Graylog Extended Log Format"""

def __init__(self, host, port, mode):
"""
Initializes the GelfClient
Args:
host (str): The GELF host
port (int): The GELF port
mode (str): The GELF transport mode
"""
self.host = host
self.port = port
self.logger = logging.getLogger('parsedmarc_syslog')
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
'udp': GelfUdpHandler,
'tcp': GelfTcpHandler,
'tls': GelfTlsHandler,
}
self.handler = self.gelf_mode[mode](host=self.host, port=self.port,
include_extra_fields=True)
self.logger.addHandler(self.handler)

def save_aggregate_report_to_gelf(self, aggregate_reports):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info('parsedmarc aggregate report')

log_context_data.parsedmarc = None

def save_forensic_report_to_gelf(self, forensic_reports):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))

def save_smtp_tls_report_to_gelf(self, smtp_tls_reports):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies = [
"tqdm>=4.31.1",
"urllib3>=1.25.7",
"xmltodict>=0.12.0",
"pygelf>=0.4.2",
]

[project.scripts]
Expand Down

0 comments on commit 11e0461

Please sign in to comment.