Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trap SIGINT to suppress noisy stack traces #113

Merged
merged 27 commits into from
Mar 10, 2023
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1f68ba4
trap SIGINT to suppress noisy stack traces
salsferrazza Mar 7, 2023
d144886
Update main.py
mservidio Mar 7, 2023
b961616
Update main.py
mservidio Mar 7, 2023
f122855
linting
mservidio Mar 7, 2023
598517a
Update main.py
mservidio Mar 7, 2023
3eedf43
Merge branch 'main' into trap_sigint
mservidio Mar 7, 2023
c01e717
Update .pylintrc
mservidio Mar 7, 2023
2074447
refactor, dump record counts on SIGINT
salsferrazza Mar 7, 2023
d809a76
remove merge conflict residue
salsferrazza Mar 7, 2023
b0bc0a6
add comment
salsferrazza Mar 7, 2023
acf10f9
Rename factory (#109)
mservidio Mar 7, 2023
94ad5fa
tweak
salsferrazza Mar 7, 2023
58cb641
Update main.py
mservidio Mar 7, 2023
fa1343d
Merge branch 'main' into trap_sigint
mservidio Mar 7, 2023
3c45b68
Update OutputManager.py
mservidio Mar 7, 2023
6162a16
Merge branch 'trap_sigint' of https://github.com/GoogleCloudPlatform/…
mservidio Mar 7, 2023
c9734dd
Update MessageParser.py
mservidio Mar 7, 2023
ce2bb5e
linting
mservidio Mar 7, 2023
7cf612c
Merge branch 'main' into trap_sigint
salsferrazza Mar 9, 2023
50fb683
print summary on SIGINT
salsferrazza Mar 10, 2023
6dae075
move to message parser
mservidio Mar 10, 2023
4e0ff6b
Update main.py
mservidio Mar 10, 2023
57f2d38
Update main.py
mservidio Mar 10, 2023
3350b32
Update MessageParser.py
mservidio Mar 10, 2023
5065eca
Update pylint.yml
mservidio Mar 10, 2023
3f7a259
Update ITCHMessageFactory.py
mservidio Mar 10, 2023
4e4f1e9
Merge branch 'main' into trap_sigint
mservidio Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions transcoder/message/MessageParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import importlib
import logging
import os
import signal
import sys
from datetime import datetime

from transcoder import LineEncoding
Expand Down Expand Up @@ -49,6 +51,9 @@ def __init__(self, # pylint: disable=too-many-arguments),too-many-locals
create_schema_enforcing_topics: bool = True, sampling_count: int = None,
message_type_inclusions: str = None, message_type_exclusions: str = None,
fix_header_tags: str = None, fix_separator: int = 1):

self.start_time = None
self.source = None
self.source_file_path = source_file_path
self.source_file_encoding = source_file_encoding
self.source_file_format_type = source_file_format_type
Expand Down Expand Up @@ -96,6 +101,8 @@ def __init__(self, # pylint: disable=too-many-arguments),too-many-locals
fix_header_tags=fix_header_tags,
fix_separator=fix_separator)

signal.signal(signal.SIGINT, self.trap)

def setup_handlers(self, message_handlers: str):
"""Initialize MessageHandler instances to employ at runtime"""

Expand Down Expand Up @@ -127,25 +134,28 @@ def setup_handlers(self, message_handlers: str):

def process(self):
"""Entry point for individual message processing"""
start_time = datetime.now()
self.start_time = datetime.now()
self.process_schemas()

source: Source = None
if self.create_schemas_only is False:
source: Source = get_message_source(self.source_file_path, self.source_file_encoding,
self.source_file_format_type, self.source_file_endian,
skip_lines=self.skip_lines, skip_bytes=self.skip_bytes,
message_skip_bytes=self.message_skip_bytes,
line_encoding=self.line_encoding)
self.source: Source = get_message_source(self.source_file_path, self.source_file_encoding,
self.source_file_format_type, self.source_file_endian,
skip_lines=self.skip_lines, skip_bytes=self.skip_bytes,
message_skip_bytes=self.message_skip_bytes,
line_encoding=self.line_encoding)

self.process_data(source)
self.process_data()

if self.output_manager is not None:
self.output_manager.wait_for_completion()

self.print_summary()

def print_summary(self):
"""Print summary of the messages that were processed"""
if logging.getLogger().isEnabledFor(logging.INFO):
end_time = datetime.now()
time_diff = end_time - start_time
time_diff = end_time - self.start_time
total_seconds = time_diff.total_seconds()

if self.create_schemas_only is True:
Expand All @@ -167,12 +177,12 @@ def process(self):
logging.info('Message type exclusions: %s', self.message_parser.message_type_exclusions)

if self.create_schemas_only is False:
logging.info('Source record count: %s', source.record_count)
logging.info('Source record count: %s', self.source.record_count)
logging.info('Processed record count: %s', self.message_parser.record_count)
logging.info('Processed schema count: %s', self.message_parser.total_schema_count)
logging.info('Summary of message counts: %s', self.message_parser.record_type_count)
logging.info('Summary of error message counts: %s', self.message_parser.error_record_type_count)
logging.info('Message rate: %s per second', round(source.record_count / total_seconds, 6))
logging.info('Message rate: %s per second', round(self.source.record_count / total_seconds, 6))

logging.info('Total runtime in seconds: %s', round(total_seconds, 6))
logging.info('Total runtime in minutes: %s', round(total_seconds / 60, 6))
Expand All @@ -196,10 +206,10 @@ def process_schemas(self):
if self.lazy_create_resources is False:
self.output_manager.wait_for_schema_creation()

def process_data(self, source):
def process_data(self):
"""Entry point for individual message processing"""
with source:
for raw_record in source.get_message_iterator():
with self.source:
for raw_record in self.source.get_message_iterator():
message: ParsedMessage = None
try:
self.error_writer.set_step(TranscodeStep.DECODE_MESSAGE)
Expand Down Expand Up @@ -241,3 +251,9 @@ def handle_exception(self, raw_record, message, exception):

if self.continue_on_error is False:
raise exception

def trap(self, _signum, _frame):
"""Trap SIGINT to suppress noisy stack traces and show interim summary"""
print()
self.print_summary()
sys.exit(1)