-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Plotting working on test dbc and test txt failling for main txt log
- Loading branch information
Kiran Jojare
committed
Jul 19, 2024
1 parent
ed611b6
commit 1967e35
Showing
4 changed files
with
146 additions
and
86 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,114 @@ | ||
import cantools | ||
import re | ||
import sys | ||
import matplotlib.pyplot as plt | ||
import argparse | ||
|
||
def parse_dbc_file(dbc_path): | ||
db = cantools.database.load_file(dbc_path) | ||
return db | ||
|
||
def parse_log_file(log_path): | ||
with open(log_path, 'r') as file: | ||
log_data = file.readlines() | ||
return log_data | ||
|
||
def decode_signals(db, log_data): | ||
signals = [] | ||
for line in log_data: | ||
match = re.match(r'.*?(\w+)\s+(\w+)\s+\d+\s+(.*?)\s+(\d+\.\d+)\s+\w\s+->\s+(.*?)\s+(\d+\.?\d*)\s+.*', line) | ||
if match: | ||
message_id = int(match.group(2), 16) | ||
timestamp = float(match.group(4)) | ||
signal_name = match.group(5) | ||
signal_value = float(match.group(6)) | ||
if message_id in db.messages: | ||
message = db.get_message_by_frame_id(message_id) | ||
decoded = {signal.name: signal_value for signal in message.signals} | ||
signals.append((timestamp, signal_name, decoded)) | ||
return signals | ||
|
||
def plot_signals(signals): | ||
timestamps, signal_names, values = zip(*signals) | ||
signal_data = {} | ||
|
||
for timestamp, signal_name, decoded in signals: | ||
if signal_name not in signal_data: | ||
signal_data[signal_name] = ([], []) | ||
signal_data[signal_name][0].append(timestamp) | ||
signal_data[signal_name][1].append(decoded[signal_name]) | ||
|
||
for signal_name, (times, vals) in signal_data.items(): | ||
plt.figure() | ||
plt.plot(times, vals, label=signal_name) | ||
plt.xlabel('Time (s)') | ||
plt.ylabel(signal_name) | ||
plt.title(f"Signal: {signal_name} Over Time") | ||
plt.legend() | ||
plt.show() | ||
def parse_dbc(file_path): | ||
try: | ||
db = cantools.database.load_file(file_path) | ||
print(f"Successfully parsed DBC file: {file_path}") | ||
return db | ||
except Exception as e: | ||
print(f"Error parsing {file_path}: {e}") | ||
return None | ||
|
||
def parse_log(db, log_file_path): | ||
try: | ||
parsed_data = [] | ||
with open(log_file_path, 'r') as log_file: | ||
current_message = None | ||
for line in log_file: | ||
if line.startswith("CAN"): | ||
if current_message: | ||
process_message(db, current_message, parsed_data) | ||
current_message = [line.strip()] | ||
elif line.strip().startswith("->"): | ||
current_message.append(line.strip()) | ||
if current_message: | ||
process_message(db, current_message, parsed_data) | ||
return parsed_data | ||
except Exception as e: | ||
print(f"Error parsing log file {log_file_path}: {e}") | ||
return None | ||
|
||
def process_message(db, message_lines, parsed_data): | ||
try: | ||
main_line = message_lines[0].split() | ||
can_id = int(main_line[2], 16) | ||
message_name = main_line[5] | ||
timestamp = float(main_line[6]) | ||
direction = main_line[7] | ||
|
||
signals = [] | ||
|
||
for signal_line in message_lines[1:]: | ||
parts = signal_line.split() | ||
signal_name = parts[1] | ||
signal_value = float(parts[2]) | ||
signals.append((signal_name, signal_value, timestamp)) | ||
|
||
parsed_data.append({ | ||
"message_name": message_name, | ||
"can_id": can_id, | ||
"timestamp": timestamp, | ||
"direction": direction, | ||
"signals": signals | ||
}) | ||
except Exception as e: | ||
print(f"Error processing message: {e}, for CAN ID (Error Frames): {can_id}") | ||
return | ||
|
||
def plot_signals(parsed_data, signal_name, start_time, end_time): | ||
timestamps = [] | ||
values = [] | ||
for data in parsed_data: | ||
for signal in data['signals']: | ||
s_name, s_value, s_timestamp = signal | ||
if s_name == signal_name and (start_time is None or s_timestamp >= start_time) and (end_time is None or s_timestamp <= end_time): | ||
timestamps.append(s_timestamp) | ||
values.append(s_value) | ||
|
||
if not timestamps: | ||
print(f"No data found for signal: {signal_name}") | ||
return | ||
|
||
plt.figure() | ||
plt.plot(timestamps, values, marker='o') | ||
plt.xlabel('Time (s)') | ||
plt.ylabel('Value') | ||
plt.title(f'Signal: {signal_name}') | ||
plt.grid(True) | ||
plt.tight_layout() | ||
plt.show() | ||
|
||
if __name__ == "__main__": | ||
dbc_path = 'test/data/test.dbc' | ||
log_path = 'test/data/test_log.txt' | ||
db = parse_dbc_file(dbc_path) | ||
log_data = parse_log_file(log_path) | ||
decoded_signals = decode_signals(db, log_data) | ||
plot_signals(decoded_signals) | ||
parser = argparse.ArgumentParser(description="Plot CAN signal data") | ||
parser.add_argument('mode', choices=['test', 'main'], help="Mode to run the script in") | ||
parser.add_argument('signal', type=str, help="Signal name to plot") | ||
parser.add_argument('start', type=float, nargs='?', default=None, help="Start time for the plot") | ||
parser.add_argument('end', type=float, nargs='?', default=None, help="End time for the plot") | ||
|
||
args = parser.parse_args() | ||
|
||
if args.mode == "test": | ||
dbc_file = "C:\\Github\\CAN-Log-Parser\\test\\data\\test.dbc" | ||
log_file = "C:\\Github\\CAN-Log-Parser\\test\\data\\test_log.txt" | ||
elif args.mode == "main": | ||
dbc_file = "C:\\Github\\CAN-Log-Parser\\data\\1200G_CAN-DBC_v01.01.00.dbc" | ||
log_file = "C:\\Github\\CAN-Log-Parser\\data\\DMW_Message_Timeout_CAN_Log.txt" | ||
|
||
db = parse_dbc(dbc_file) | ||
if db: | ||
parsed_data = parse_log(db, log_file) | ||
if parsed_data: | ||
print("Done parsing log file") | ||
if args.start is None or args.end is None: | ||
# Determine the start and end times from the log if not provided | ||
all_timestamps = [data['timestamp'] for data in parsed_data] | ||
start_time = min(all_timestamps) if args.start is None else args.start | ||
end_time = max(all_timestamps) if args.end is None else args.end | ||
else: | ||
start_time = args.start | ||
end_time = args.end | ||
|
||
plot_signals(parsed_data, args.signal, start_time, end_time) |