From 33b43479f93d40a09d383ee2e52226074d7fd8d4 Mon Sep 17 00:00:00 2001 From: Salvatore Sferrazza Date: Fri, 10 Mar 2023 10:53:06 -0500 Subject: [PATCH] trap SIGINT to suppress noisy stack traces (#113) * trap SIGINT to suppress noisy stack traces * Update main.py * Update main.py * linting * Update main.py * Update .pylintrc * refactor, dump record counts on SIGINT * remove merge conflict residue * add comment * Rename factory (#109) Co-authored-by: Matt Tait <42380792+t8dogg@users.noreply.github.com> * tweak * Update main.py * Update OutputManager.py * Update MessageParser.py * linting * print summary on SIGINT * move to message parser * Update main.py * Update main.py * Update MessageParser.py * Update pylint.yml * Update ITCHMessageFactory.py --------- Co-authored-by: Mark Servidio Co-authored-by: Matt Tait <42380792+t8dogg@users.noreply.github.com> --- transcoder/message/MessageParser.py | 44 ++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/transcoder/message/MessageParser.py b/transcoder/message/MessageParser.py index fef1419..5627273 100644 --- a/transcoder/message/MessageParser.py +++ b/transcoder/message/MessageParser.py @@ -22,6 +22,8 @@ import importlib import logging import os +import signal +import sys from datetime import datetime from transcoder import LineEncoding @@ -49,6 +51,9 @@ def __init__(self, # pylint: disable=too-many-arguments),too-many-locals create_schema_enforcing_topics: bool = True, sampling_count: int = None, message_type_inclusions: str = None, message_type_exclusions: str = None, fix_header_tags: str = None, fix_separator: int = 1): + + self.start_time = None + self.source = None self.source_file_path = source_file_path self.source_file_encoding = source_file_encoding self.source_file_format_type = source_file_format_type @@ -96,6 +101,8 @@ def __init__(self, # pylint: disable=too-many-arguments),too-many-locals fix_header_tags=fix_header_tags, fix_separator=fix_separator) + signal.signal(signal.SIGINT, self.trap) + def setup_handlers(self, message_handlers: str): """Initialize MessageHandler instances to employ at runtime""" @@ -127,25 +134,28 @@ def setup_handlers(self, message_handlers: str): def process(self): """Entry point for individual message processing""" - start_time = datetime.now() + self.start_time = datetime.now() self.process_schemas() - source: Source = None if self.create_schemas_only is False: - source: Source = get_message_source(self.source_file_path, self.source_file_encoding, - self.source_file_format_type, self.source_file_endian, - skip_lines=self.skip_lines, skip_bytes=self.skip_bytes, - message_skip_bytes=self.message_skip_bytes, - line_encoding=self.line_encoding) + self.source: Source = get_message_source(self.source_file_path, self.source_file_encoding, + self.source_file_format_type, self.source_file_endian, + skip_lines=self.skip_lines, skip_bytes=self.skip_bytes, + message_skip_bytes=self.message_skip_bytes, + line_encoding=self.line_encoding) - self.process_data(source) + self.process_data() if self.output_manager is not None: self.output_manager.wait_for_completion() + self.print_summary() + + def print_summary(self): + """Print summary of the messages that were processed""" if logging.getLogger().isEnabledFor(logging.INFO): end_time = datetime.now() - time_diff = end_time - start_time + time_diff = end_time - self.start_time total_seconds = time_diff.total_seconds() if self.create_schemas_only is True: @@ -167,12 +177,12 @@ def process(self): logging.info('Message type exclusions: %s', self.message_parser.message_type_exclusions) if self.create_schemas_only is False: - logging.info('Source record count: %s', source.record_count) + logging.info('Source record count: %s', self.source.record_count) logging.info('Processed record count: %s', self.message_parser.record_count) logging.info('Processed schema count: %s', self.message_parser.total_schema_count) logging.info('Summary of message counts: %s', self.message_parser.record_type_count) logging.info('Summary of error message counts: %s', self.message_parser.error_record_type_count) - logging.info('Message rate: %s per second', round(source.record_count / total_seconds, 6)) + logging.info('Message rate: %s per second', round(self.source.record_count / total_seconds, 6)) logging.info('Total runtime in seconds: %s', round(total_seconds, 6)) logging.info('Total runtime in minutes: %s', round(total_seconds / 60, 6)) @@ -196,10 +206,10 @@ def process_schemas(self): if self.lazy_create_resources is False: self.output_manager.wait_for_schema_creation() - def process_data(self, source): + def process_data(self): """Entry point for individual message processing""" - with source: - for raw_record in source.get_message_iterator(): + with self.source: + for raw_record in self.source.get_message_iterator(): message: ParsedMessage = None try: self.error_writer.set_step(TranscodeStep.DECODE_MESSAGE) @@ -241,3 +251,9 @@ def handle_exception(self, raw_record, message, exception): if self.continue_on_error is False: raise exception + + def trap(self, _signum, _frame): + """Trap SIGINT to suppress noisy stack traces and show interim summary""" + print() + self.print_summary() + sys.exit(1)