Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GELF support #532

Merged
merged 2 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/source/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ path = parsedmarc
[syslog]
server = localhost
port = 514

[gelf]
host = logger
port = 12201
mode = tcp
```

The full set of configuration options are:
Expand Down Expand Up @@ -343,6 +348,10 @@ The full set of configuration options are:
:::{note}
Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal).
:::
- `gelf`
- `host` - str: The GELF server name or IP address
- `port` - int: The port to use
- `mode` - str: The GELF transport type to use. Valid modes: `tcp`, `udp`, `tls`

:::{warning}
It is **strongly recommended** to **not** use the `nameservers`
Expand Down
56 changes: 54 additions & 2 deletions parsedmarc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from parsedmarc import get_dmarc_reports_from_mailbox, watch_inbox, \
parse_report_file, get_dmarc_reports_from_mbox, elastic, opensearch, \
kafkaclient, splunk, save_output, email_results, ParserError, \
__version__, InvalidDMARCReport, s3, syslog, loganalytics
__version__, InvalidDMARCReport, s3, syslog, loganalytics, gelf
from parsedmarc.mail import IMAPConnection, MSGraphConnection, GmailConnection
from parsedmarc.mail.graph import AuthMethod

Expand Down Expand Up @@ -145,6 +145,12 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

try:
if opts.gelf_host:
gelf_client.save_aggregate_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

if opts.hec:
try:
aggregate_reports_ = reports_["aggregate_reports"]
Expand Down Expand Up @@ -212,6 +218,12 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

try:
if opts.gelf_host:
gelf_client.save_forensic_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

if opts.hec:
try:
forensic_reports_ = reports_["forensic_reports"]
Expand Down Expand Up @@ -279,6 +291,12 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Syslog Error: {0}".format(error_.__str__()))

try:
if opts.gelf_host:
gelf_client.save_smtp_tls_report_to_gelf(report)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

if opts.hec:
try:
smtp_tls_reports_ = reports_["smtp_tls_reports"]
Expand Down Expand Up @@ -491,7 +509,10 @@ def process_reports(reports_):
la_dcr_immutable_id=None,
la_dcr_aggregate_stream=None,
la_dcr_forensic_stream=None,
la_dcr_smtp_tls_stream=None
la_dcr_smtp_tls_stream=None,
gelf_host=None,
gelf_port=None,
gelf_mode=None,
)
args = arg_parser.parse_args()

Expand Down Expand Up @@ -992,6 +1013,27 @@ def process_reports(reports_):
opts.la_dcr_smtp_tls_stream = \
log_analytics_config.get("dcr_smtp_tls_stream")

if "gelf" in config.sections():
gelf_config = config["gelf"]
if "host" in gelf_config:
opts.gelf_host = gelf_config["host"]
else:
logger.critical("host setting missing from the "
"gelf config section")
exit(-1)
if "port" in gelf_config:
opts.gelf_port = gelf_config["port"]
else:
logger.critical("port setting missing from the "
"gelf config section")
exit(-1)
if "mode" in gelf_config:
opts.gelf_mode = gelf_config["mode"]
else:
logger.critical("mode setting missing from the "
"gelf config section")
exit(-1)

logger.setLevel(logging.ERROR)

if opts.warnings:
Expand Down Expand Up @@ -1130,6 +1172,16 @@ def process_reports(reports_):
except Exception as error_:
logger.error("Kafka Error: {0}".format(error_.__str__()))

if opts.gelf_host:
try:
gelf_client = gelf.GelfClient(
host=opts.gelf_host,
port=int(opts.gelf_port),
mode=opts.gelf_mode,
)
except Exception as error_:
logger.error("GELF Error: {0}".format(error_.__str__()))

kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_forensic_topic = opts.kafka_forensic_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic
Expand Down
64 changes: 64 additions & 0 deletions parsedmarc/gelf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-

import logging
import logging.handlers
import json
import threading

from parsedmarc import parsed_aggregate_reports_to_csv_rows, \
parsed_forensic_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows
from pygelf import GelfTcpHandler, GelfUdpHandler, GelfTlsHandler


log_context_data = threading.local()


class ContextFilter(logging.Filter):

def filter(self, record):
record.parsedmarc = log_context_data.parsedmarc
return True


class GelfClient(object):
"""A client for the Graylog Extended Log Format"""

def __init__(self, host, port, mode):
"""
Initializes the GelfClient
Args:
host (str): The GELF host
port (int): The GELF port
mode (str): The GELF transport mode
"""
self.host = host
self.port = port
self.logger = logging.getLogger('parsedmarc_syslog')
self.logger.setLevel(logging.INFO)
self.logger.addFilter(ContextFilter())
self.gelf_mode = {
'udp': GelfUdpHandler,
'tcp': GelfTcpHandler,
'tls': GelfTlsHandler,
}
self.handler = self.gelf_mode[mode](host=self.host, port=self.port,
include_extra_fields=True)
self.logger.addHandler(self.handler)

def save_aggregate_report_to_gelf(self, aggregate_reports):
rows = parsed_aggregate_reports_to_csv_rows(aggregate_reports)
for row in rows:
log_context_data.parsedmarc = row
self.logger.info('parsedmarc aggregate report')

log_context_data.parsedmarc = None

def save_forensic_report_to_gelf(self, forensic_reports):
rows = parsed_forensic_reports_to_csv_rows(forensic_reports)
for row in rows:
self.logger.info(json.dumps(row))

def save_smtp_tls_report_to_gelf(self, smtp_tls_reports):
rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports)
for row in rows:
self.logger.info(json.dumps(row))
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies = [
"tqdm>=4.31.1",
"urllib3>=1.25.7",
"xmltodict>=0.12.0",
"pygelf>=0.4.2",
]

[project.scripts]
Expand Down