generated from stratosphereips/awesome-code-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathzeek-term.py
executable file
·148 lines (129 loc) · 5.8 KB
/
zeek-term.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python
import argparse
import os
import sys
import json
from datetime import datetime, timezone, timedelta
# Define ANSI escape codes for background and foreground colors
background_colors = {
'conn': '\033[30;41m', # Black text on Red
'http': '\033[30;42m', # Black text on Green
'dns': '\033[30;43m', # Black text on Yellow
'ssl': '\033[30;46m', # Black text on Cyan
'x509': '\033[30;45m', # Black text on Magenta
'files': '\033[30;46m', # Black text on Cyan
'quic': '\033[30;47m', # Black text on White
'ntp': '\033[30;100m', # Black text on Bright Black (Dark Grey)
'dhcp': '\033[30;101m' # Black text on Bright Red
}
foreground_colors = {
'conn': '\033[31m', # Red
'http': '\033[32m', # Green
'dns': '\033[33m', # Yellow
'ssl': '\033[36m', # Cyan
'x509': '\033[35m', # Magenta
'files': '\033[36m', # Cyan
'quic': '\033[37m', # White
'ntp': '\033[90m', # Bright Black (Dark Grey)
'dhcp': '\033[91m' # Bright Red
}
reset_color = '\033[0m'
# Define the file patterns for each log file type
file_patterns = {
'conn': 'conn.log',
'http': 'http.log',
'dns': 'dns.log',
'ssl': 'ssl.log',
'x509': 'x509.log',
'files': 'files.log',
'quic': 'quic.log',
'ntp': 'ntp.log',
'dhcp': 'dhcp.log'
}
# Setup argument parser
parser = argparse.ArgumentParser(description='Process log files with colored output.')
parser.add_argument('-f', '--foreground', action='store_true', help='Use foreground colors instead of background colors')
parser.add_argument('-d', '--directory', type=str, required=True, help='Directory where the Zeek log files are located')
parser.add_argument('-c', '--filter-conn', action='store_true', help='Filter conn.log lines based on UIDs present in other logs')
parser.add_argument('-n', '--no-ts-conversion', action='store_true', help='Disable conversion of ts to human-readable format')
parser.add_argument('-t', '--timezone', type=str, default='UTC+2', help='Specify the timezone to use for conversion (e.g., UTC+2). If not specified, use "???" for the timezone.')
args = parser.parse_args()
log_entries = []
conn_entries = []
uids = set()
# Select the appropriate color scheme
color_scheme = foreground_colors if args.foreground else background_colors
# Parse timezone
def parse_timezone(tz_str):
if tz_str == '???':
return timedelta(0), '???'
if tz_str.startswith('UTC'):
sign = 1 if '+' in tz_str else -1
offset_hours = int(tz_str.split('UTC')[1])
tz = timezone(timedelta(hours=sign * offset_hours))
return tz, tz_str
return timezone(timedelta(0)), '???'
tz, tz_name = parse_timezone(args.timezone)
def convert_ts(ts):
"""Convert a Zeek timestamp to a human-readable format preserving original precision."""
ts_str = f"{ts:.6f}" # Ensure the timestamp has exactly 6 decimal places
dt = datetime.fromtimestamp(float(ts_str), tz=tz)
return dt.strftime(f'%Y-%m-%d %H:%M:%S.{ts_str.split(".")[1]} {tz_name}')
def process_text_log_line(log_type, parts):
if not args.no_ts_conversion:
parts[0] = convert_ts(parts[0])
if log_type == 'files' and len(parts) > 3:
uids.add(parts[2]) # Collect UID from files.log
parts = [parts[0]] + [log_type] + [parts[2]] + parts[3:] # Remove FUID, keep UID
elif log_type != 'conn' and len(parts) > 1:
uids.add(parts[1]) # Collect UID from other logs
parts = [parts[0]] + [log_type] + [parts[1]] + parts[2:] # Keep UID column, add log type
log_entries.append(('\t'.join(parts), color_scheme[log_type]))
elif log_type == 'conn' and len(parts) > 1:
conn_entries.append(parts)
def process_json_log_line(log_type, data):
if not args.no_ts_conversion:
data['ts'] = convert_ts(data['ts'])
if 'uid' in data:
uid = data['uid']
if log_type == 'files':
uids.add(uid) # Collect UID from files.log
elif log_type != 'conn':
uids.add(uid) # Collect UID from other logs
data = {'ts': data['ts'], 'log_type': log_type, 'uid': uid, **data}
log_entries.append((json.dumps(data), color_scheme[log_type]))
elif log_type == 'conn':
conn_entries.append(data)
else:
data = {'ts': data['ts'], 'log_type': log_type, **data}
log_entries.append((json.dumps(data), color_scheme.get(log_type, reset_color)))
# Read and process each file
for log_type, filename in file_patterns.items():
filepath = os.path.join(args.directory, filename)
if os.path.isfile(filepath):
with open(filepath, 'r') as file:
for line in file:
if not line.startswith('#'):
try:
# Try to parse JSON
data = json.loads(line.strip())
process_json_log_line(log_type, data)
except json.JSONDecodeError:
# Fallback to text-based processing
parts = line.strip().split('\t')
process_text_log_line(log_type, parts)
# Process conn.log entries and filter based on UIDs
if args.filter_conn:
for parts in conn_entries:
if parts['uid'] not in uids:
data = {'ts': parts['ts'], 'log_type': 'conn', 'uid': parts['uid'], **parts}
log_entries.append((json.dumps(data), color_scheme['conn']))
else:
for parts in conn_entries:
data = {'ts': parts['ts'], 'log_type': 'conn', 'uid': parts['uid'], **parts}
log_entries.append((json.dumps(data), color_scheme['conn']))
# Sort the log entries by the timestamp (first column)
log_entries.sort(key=lambda x: float(x[0].split('\t')[0]) if '\t' in x[0] else json.loads(x[0])['ts'])
# Print the sorted log entries with appropriate colors
for entry, color in log_entries:
print(f"{color}{entry}{reset_color}")