Skip to content

Commit

Permalink
trap SIGINT to suppress noisy stack traces (#113)
Browse files Browse the repository at this point in the history
* trap SIGINT to suppress noisy stack traces

* Update main.py

* Update main.py

* linting

* Update main.py

* Update .pylintrc

* refactor, dump record counts on SIGINT

* remove merge conflict residue

* add comment

* Rename factory (#109)

Co-authored-by: Matt Tait <[email protected]>

* tweak

* Update main.py

* Update OutputManager.py

* Update MessageParser.py

* linting

* print summary on SIGINT

* move to message parser

* Update main.py

* Update main.py

* Update MessageParser.py

* Update pylint.yml

* Update ITCHMessageFactory.py

---------

Co-authored-by: Mark Servidio <[email protected]>
Co-authored-by: Matt Tait <[email protected]>
  • Loading branch information
3 people authored Mar 10, 2023
1 parent ba6ae8d commit 33b4347
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions transcoder/message/MessageParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import importlib
import logging
import os
import signal
import sys
from datetime import datetime

from transcoder import LineEncoding
Expand Down Expand Up @@ -49,6 +51,9 @@ def __init__(self, # pylint: disable=too-many-arguments),too-many-locals
create_schema_enforcing_topics: bool = True, sampling_count: int = None,
message_type_inclusions: str = None, message_type_exclusions: str = None,
fix_header_tags: str = None, fix_separator: int = 1):

self.start_time = None
self.source = None
self.source_file_path = source_file_path
self.source_file_encoding = source_file_encoding
self.source_file_format_type = source_file_format_type
Expand Down Expand Up @@ -96,6 +101,8 @@ def __init__(self, # pylint: disable=too-many-arguments),too-many-locals
fix_header_tags=fix_header_tags,
fix_separator=fix_separator)

signal.signal(signal.SIGINT, self.trap)

def setup_handlers(self, message_handlers: str):
"""Initialize MessageHandler instances to employ at runtime"""

Expand Down Expand Up @@ -127,25 +134,28 @@ def setup_handlers(self, message_handlers: str):

def process(self):
"""Entry point for individual message processing"""
start_time = datetime.now()
self.start_time = datetime.now()
self.process_schemas()

source: Source = None
if self.create_schemas_only is False:
source: Source = get_message_source(self.source_file_path, self.source_file_encoding,
self.source_file_format_type, self.source_file_endian,
skip_lines=self.skip_lines, skip_bytes=self.skip_bytes,
message_skip_bytes=self.message_skip_bytes,
line_encoding=self.line_encoding)
self.source: Source = get_message_source(self.source_file_path, self.source_file_encoding,
self.source_file_format_type, self.source_file_endian,
skip_lines=self.skip_lines, skip_bytes=self.skip_bytes,
message_skip_bytes=self.message_skip_bytes,
line_encoding=self.line_encoding)

self.process_data(source)
self.process_data()

if self.output_manager is not None:
self.output_manager.wait_for_completion()

self.print_summary()

def print_summary(self):
"""Print summary of the messages that were processed"""
if logging.getLogger().isEnabledFor(logging.INFO):
end_time = datetime.now()
time_diff = end_time - start_time
time_diff = end_time - self.start_time
total_seconds = time_diff.total_seconds()

if self.create_schemas_only is True:
Expand All @@ -167,12 +177,12 @@ def process(self):
logging.info('Message type exclusions: %s', self.message_parser.message_type_exclusions)

if self.create_schemas_only is False:
logging.info('Source record count: %s', source.record_count)
logging.info('Source record count: %s', self.source.record_count)
logging.info('Processed record count: %s', self.message_parser.record_count)
logging.info('Processed schema count: %s', self.message_parser.total_schema_count)
logging.info('Summary of message counts: %s', self.message_parser.record_type_count)
logging.info('Summary of error message counts: %s', self.message_parser.error_record_type_count)
logging.info('Message rate: %s per second', round(source.record_count / total_seconds, 6))
logging.info('Message rate: %s per second', round(self.source.record_count / total_seconds, 6))

logging.info('Total runtime in seconds: %s', round(total_seconds, 6))
logging.info('Total runtime in minutes: %s', round(total_seconds / 60, 6))
Expand All @@ -196,10 +206,10 @@ def process_schemas(self):
if self.lazy_create_resources is False:
self.output_manager.wait_for_schema_creation()

def process_data(self, source):
def process_data(self):
"""Entry point for individual message processing"""
with source:
for raw_record in source.get_message_iterator():
with self.source:
for raw_record in self.source.get_message_iterator():
message: ParsedMessage = None
try:
self.error_writer.set_step(TranscodeStep.DECODE_MESSAGE)
Expand Down Expand Up @@ -241,3 +251,9 @@ def handle_exception(self, raw_record, message, exception):

if self.continue_on_error is False:
raise exception

def trap(self, _signum, _frame):
"""Trap SIGINT to suppress noisy stack traces and show interim summary"""
print()
self.print_summary()
sys.exit(1)

0 comments on commit 33b4347

Please sign in to comment.