From a128521c19df0eb50850f3bb74fc9685c2beb7e1 Mon Sep 17 00:00:00 2001 From: rocky4546 <24759693+rocky4546@users.noreply.github.com> Date: Sat, 2 Dec 2023 16:35:21 -0600 Subject: [PATCH] Added parallel processing of video clips --- lib/clients/channels/channels.py | 4 +- lib/common/decorators.py | 50 ++--- lib/common/utils.py | 2 +- lib/main.py | 4 +- lib/plugins/plugin_channels.py | 8 +- lib/plugins/plugin_epg.py | 2 +- lib/plugins/plugin_programs.py | 2 +- lib/plugins/repo_handler.py | 2 +- lib/streams/m3u8_queue.py | 365 ++++++++++++++++++------------- tvh_main.py | 4 +- 10 files changed, 253 insertions(+), 190 deletions(-) diff --git a/lib/clients/channels/channels.py b/lib/clients/channels/channels.py index e2d5873..8a796a5 100644 --- a/lib/clients/channels/channels.py +++ b/lib/clients/channels/channels.py @@ -167,6 +167,7 @@ def get_channels_json(_config, _base_url, _namespace, _instance, _plugins): config_section = utils.instance_config_section(sid_data['namespace'], sid_data['instance']) if not _config[config_section]['enabled']: continue + sids_processed.append(sid) stream = _config[config_section]['player-stream_type'] if stream == 'm3u8redirect': uri = sid_data['json']['stream_url'] @@ -195,7 +196,6 @@ def get_channels_xml(_config, _base_url, _namespace, _instance, _plugins): for sid_data in sid_data_list: if sid in sids_processed: continue - sids_processed.append(sid) if not sid_data['enabled']: continue if not _plugins.get(sid_data['namespace']): @@ -205,9 +205,11 @@ def get_channels_xml(_config, _base_url, _namespace, _instance, _plugins): if not _plugins[sid_data['namespace']] \ .plugin_obj.instances[sid_data['instance']].enabled: continue + config_section = utils.instance_config_section(sid_data['namespace'], sid_data['instance']) if not _config[config_section]['enabled']: continue + sids_processed.append(sid) stream = _config[config_section]['player-stream_type'] if stream == 'm3u8redirect': uri = sid_data['json']['stream_url'] diff --git a/lib/common/decorators.py b/lib/common/decorators.py index 40900a7..80ec282 100644 --- a/lib/common/decorators.py +++ b/lib/common/decorators.py @@ -69,36 +69,12 @@ def wrapper_func(self, *args, **kwargs): ex_save = ex self.logger.info("ConnectionResetError in function {}(), retrying {} {} {}" .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) - except (requests.exceptions.HTTPError, urllib.error.HTTPError) as ex: - ex_save = ex - self.logger.info("HTTPError in function {}(), retrying {} {} {}" - .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0), )) - except urllib.error.URLError as ex: - ex_save = ex - if isinstance(ex.reason, ConnectionRefusedError): - self.logger.info("URLError:ConnectionRefusedError in function {}(): {} {} {}" - .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) - count = 5 - while count > 0: - try: - x = f(self, *args, **kwargs) - return x - except urllib.error.URLError as ex2: - self.logger.debug("{} URLError:ConnectionRefusedError in function {}(): {} {} {}" - .format(count, f.__qualname__, os.getpid(), str(ex_save), str(arg0))) - count -= 1 - time.sleep(.5) - except Exception as ex3: - break - else: - self.logger.info("URLError in function {}(), retrying (): {} {} {}" - .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) except requests.exceptions.InvalidURL as ex: ex_save = ex self.logger.info("InvalidURL Error in function {}(), retrying {} {} {}" .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) - except (socket.timeout, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as ex: + except (socket.timeout, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout, httpx.ReadTimeout, httpx.ConnectTimeout) as ex: ex_save = ex self.logger.info("Socket Timeout Error in function {}(), retrying {} {} {}" .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) @@ -147,6 +123,30 @@ def wrapper_func(self, *args, **kwargs): ex_save = ex self.logger.info('InvalidURL Error, encoding and trying again. In function {}() {} {} {}' .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) + except (requests.exceptions.HTTPError, urllib.error.HTTPError, httpx.HTTPError) as ex: + ex_save = ex + self.logger.info("HTTPError in function {}(), retrying {} {} {}" + .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0), )) + except urllib.error.URLError as ex: + ex_save = ex + if isinstance(ex.reason, ConnectionRefusedError): + self.logger.info("URLError:ConnectionRefusedError in function {}(): {} {} {}" + .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) + count = 5 + while count > 0: + try: + x = f(self, *args, **kwargs) + return x + except urllib.error.URLError as ex2: + self.logger.debug("{} URLError:ConnectionRefusedError in function {}(): {} {} {}" + .format(count, f.__qualname__, os.getpid(), str(ex_save), str(arg0))) + count -= 1 + time.sleep(.5) + except Exception as ex3: + break + else: + self.logger.info("URLError in function {}(), retrying (): {} {} {}" + .format(f.__qualname__, os.getpid(), str(ex_save), str(arg0))) except (requests.exceptions.ProxyError, requests.exceptions.SSLError, \ requests.exceptions.TooManyRedirects, requests.exceptions.InvalidHeader, \ requests.exceptions.InvalidProxyURL, requests.exceptions.ChunkedEncodingError, \ diff --git a/lib/common/utils.py b/lib/common/utils.py index 9b73e82..45eddf9 100644 --- a/lib/common/utils.py +++ b/lib/common/utils.py @@ -37,7 +37,7 @@ import lib.common.exceptions as exceptions -VERSION = '0.9.14.00-RC01' +VERSION = '0.9.14.00-RC02' CABERNET_URL = 'https://github.com/cabernetwork/cabernet' CABERNET_ID = 'cabernet' CABERNET_REPO = 'manifest.json' diff --git a/lib/main.py b/lib/main.py index 5c320fb..9637654 100644 --- a/lib/main.py +++ b/lib/main.py @@ -43,9 +43,9 @@ try: import httpx except ImportError: - pip(['install', 'httpx']) + pip(['install', 'httpx[http2]']) except ModuleNotFoundError: - print('Unable to install required httpx module') + print('Unable to install required httpx[http2] module') except (ImportError, ModuleNotFoundError): print('Unable to load pip module to install required modules') diff --git a/lib/plugins/plugin_channels.py b/lib/plugins/plugin_channels.py index e5479bd..3c08529 100644 --- a/lib/plugins/plugin_channels.py +++ b/lib/plugins/plugin_channels.py @@ -86,7 +86,7 @@ def get_uri_json_data(self, _uri): header = { 'Content-Type': 'application/json', 'User-agent': utils.DEFAULT_USER_AGENT} - resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=(8, 8)) + resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=8) x = resp.json() resp.raise_for_status() return x @@ -99,9 +99,9 @@ def get_uri_data(self, _uri, _header=None, _data=None): else: header = _header if _data: - resp = self.plugin_obj.http_session.post(_uri, headers=header, data=_data, timeout=(8, 8)) + resp = self.plugin_obj.http_session.post(_uri, headers=header, data=_data, timeout=8) else: - resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=(8, 8)) + resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=8) x = resp.content return x @@ -180,7 +180,7 @@ def get_thumbnail_size(self, _thumbnail, _ch_uid, ): 'Accept-Encoding': 'identity', 'Connection': 'Keep-Alive' } - resp = self.plugin_obj.http_session.get(_thumbnail, headers=h, timeout=(8, 8)) + resp = self.plugin_obj.http_session.get(_thumbnail, headers=h, timeout=8) resp.raise_for_status() img_blob = resp.content fp = io.BytesIO(img_blob) diff --git a/lib/plugins/plugin_epg.py b/lib/plugins/plugin_epg.py index ca57cac..ad7a931 100644 --- a/lib/plugins/plugin_epg.py +++ b/lib/plugins/plugin_epg.py @@ -65,7 +65,7 @@ def get_uri_data(self, _uri, _header=None): header = {'User-agent': utils.DEFAULT_USER_AGENT} else: header = _header - resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=(4, 8)) + resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=8) x = resp.json() resp.raise_for_status() return x diff --git a/lib/plugins/plugin_programs.py b/lib/plugins/plugin_programs.py index f28b21a..0b4831b 100644 --- a/lib/plugins/plugin_programs.py +++ b/lib/plugins/plugin_programs.py @@ -60,7 +60,7 @@ def get_uri_data(self, _uri, _header=None): header = {'User-agent': utils.DEFAULT_USER_AGENT} else: header = _header - resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=(4, 8)) + resp = self.plugin_obj.http_session.get(_uri, headers=header, timeout=8) x = resp.json() resp.raise_for_status() return x diff --git a/lib/plugins/repo_handler.py b/lib/plugins/repo_handler.py index 30c8eb4..353de7d 100644 --- a/lib/plugins/repo_handler.py +++ b/lib/plugins/repo_handler.py @@ -149,7 +149,7 @@ def update_plugins(self, _repo_settings): def get_uri_data(self, _uri): header = { 'User-agent': utils.DEFAULT_USER_AGENT} - resp = RepoHandler.http_session.get(_uri, headers=header, timeout=(4, 8)) + resp = RepoHandler.http_session.get(_uri, headers=header, timeout=8) x = resp.content resp.raise_for_status() return x diff --git a/lib/streams/m3u8_queue.py b/lib/streams/m3u8_queue.py index c0c17fd..14f62c2 100644 --- a/lib/streams/m3u8_queue.py +++ b/lib/streams/m3u8_queue.py @@ -43,6 +43,7 @@ PLAY_LIST = OrderedDict() +PROCESSED_URLS = {} IN_QUEUE = Queue() OUT_QUEUE = Queue() TERMINATE_REQUESTED = False @@ -50,108 +51,51 @@ STREAM_QUEUE = Queue() OUT_QUEUE_LIST = [] IS_VOD = False +UID_COUNTER = 1 +UID_PROCESSED = 1 -class M3U8Queue(Thread): - """ - This runs as an independent process (one per stream) to get and process the - data stream as fast as possible and return it to the tuner web server for - output to the client. - """ - is_stuck = None - #http_session = httpx.Client(http2=True) - http_session = httpx.Client(verify=False) - - def __init__(self, _config, _channel_dict): +class M3U8GetUriData(Thread): + def __init__(self, _queue_item, _uid_counter, _config): Thread.__init__(self) + self.queue_item = _queue_item + self.uid_counter = _uid_counter + self.video = Video(_config) self.logger = logging.getLogger(__name__ + str(threading.get_ident())) - self.config = _config - self.namespace = _channel_dict['namespace'].lower() self.pts_validation = None - self.initialized_psi = False - self.first_segment = True - self.config_section = utils.instance_config_section(_channel_dict['namespace'], _channel_dict['instance']) - self.atsc_msg = ATSCMsg() - self.channel_dict = _channel_dict - if self.config[self.config_section]['player-enable_pts_filter']: - self.pts_validation = PTSValidation(_config, _channel_dict) - self.video = Video(self.config) - self.atsc = _channel_dict['atsc'] - if _channel_dict['json'].get('Header') is None: - self.header = {'User-agent': utils.DEFAULT_USER_AGENT} - else: - self.header = _channel_dict['json']['Header'] - if _channel_dict['json'].get('use_date_on_m3u8_key') is None: - self.use_date_on_key = True - else: - self.use_date_on_key = _channel_dict['json']['use_date_on_m3u8_key'] - - self.pts_resync = PTSResync(_config, self.config_section, _channel_dict['uid']) - self.key_list = {} + if _config[M3U8Queue.config_section]['player-enable_pts_filter']: + self.pts_validation = PTSValidation(_config, M3U8Queue.channel_dict) self.start() + def run(self): + global UID_COUNTER + global UID_PROCESSED + global STREAM_QUEUE + self.logger.debug('M3U8GetUriData started {} {} {}'.format(self.queue_item['data']['uri'], os.getpid(), threading.get_ident())) + m3u8_data = self.process_m3u8_item(self.queue_item) + PROCESSED_URLS[self.uid_counter] = m3u8_data + STREAM_QUEUE.put({'uri_dt': 'check_processed_list'}) + m3u8_data = None + self.queue_item = None + self.uid_counter = None + self.video = None + self.pts_validation = None + self.logger.debug('M3U8GetUriData terminated {} {}'.format(os.getpid(), threading.get_ident())) + self.logger = None + + + @handle_url_except() def get_uri_data(self, _uri): - resp = self.http_session.get(_uri, headers=self.header, timeout=(8, 8), follow_redirects=True) + self.logger.warning(M3U8Queue.http_header) + resp = M3U8Queue.http_session.get(_uri, headers=M3U8Queue.http_header, timeout=5, follow_redirects=True) x = resp.content resp.raise_for_status() return x - def run(self): - global OUT_QUEUE - global STREAM_QUEUE - global TERMINATE_REQUESTED - try: - while not TERMINATE_REQUESTED: - queue_item = STREAM_QUEUE.get() - if queue_item['uri_dt'] == 'terminate': - self.logger.debug('Received terminate from internalproxy {}'.format(os.getpid())) - TERMINATE_REQUESTED = True - break - elif queue_item['uri_dt'] == 'status': - out_queue_put({'uri': 'running', - 'data': None, - 'stream': None, - 'atsc': None}) - continue - time.sleep(0.01) - self.process_m3u8_item(queue_item) - except (KeyboardInterrupt, EOFError) as ex: - TERMINATE_REQUESTED = True - clear_queues() - if self.pts_resync is not None: - self.pts_resync.terminate() - self.pts_resync = None - time.sleep(0.01) - sys.exit() - except Exception as ex: - TERMINATE_REQUESTED = True - STREAM_QUEUE.put({'uri_dt': 'terminate'}) - IN_QUEUE.put({'uri': 'terminate'}) - if self.pts_resync is not None: - self.pts_resync.terminate() - self.pts_resync = None - clear_queues() - time.sleep(0.01) - self.logger.exception('{}'.format( - 'UNEXPECTED EXCEPTION M3U8Queue=')) - sys.exit() - # we are terminating so cleanup ffmpeg - if self.pts_resync is not None: - self.pts_resync.terminate() - self.pts_resync = None - time.sleep(0.01) - out_queue_put({'uri': 'terminate', - 'data': None, - 'stream': None, - 'atsc': None}) - time.sleep(0.01) - TERMINATE_REQUESTED = True - self.logger.debug('M3U8Queue terminated {}'.format(os.getpid())) - def decrypt_stream(self, _data): if _data['key'] and _data['key']['uri']: - if _data['key']['uri'] in self.key_list.keys(): - key_data = self.key_list[_data['key']['uri']] + if _data['key']['uri'] in M3U8Queue.key_list.keys(): + key_data = M3U8Queue.key_list[_data['key']['uri']] self.logger.debug('Reusing key {} {}'.format(os.getpid(), _data['key']['uri'])) elif not _data['key']['uri'].startswith('http'): self.logger.warning('Unknown protocol, aborting {} {}'.format(os.getpid(), _data['key']['uri'])) @@ -160,7 +104,7 @@ def decrypt_stream(self, _data): key_data = self.get_uri_data(_data['key']['uri']) if key_data is not None: - self.key_list[_data['key']['uri']] = key_data + M3U8Queue.key_list[_data['key']['uri']] = key_data if _data['key']['iv'] is None: # if iv is none, use a random value iv = bytearray.fromhex('000000000000000000000000000000F6') @@ -171,36 +115,57 @@ def decrypt_stream(self, _data): cipher = Cipher(algorithms.AES(key_data), modes.CBC(iv), default_backend()) decryptor = cipher.decryptor() self.video.data = decryptor.update(self.video.data) - if len(self.key_list.keys()) > 20: - del self.key_list[list(self.key_list)[0]] + if len(M3U8Queue.key_list.keys()) > 20: + del M3U8Queue.key_list[list(M3U8Queue.key_list)[0]] return True def atsc_processing(self): - if not self.atsc: - p_list = self.atsc_msg.extract_psip(self.video.data) + if not M3U8Queue.atsc: + p_list = M3U8Queue.atsc_msg.extract_psip(self.video.data) if len(p_list) != 0: - self.atsc = p_list + M3U8Queue.atsc = p_list self.channel_dict['atsc'] = p_list - self.initialized_psi = True + M3U8Queue.initialized_psi = True return p_list - elif not self.initialized_psi: - p_list = self.atsc_msg.extract_psip(self.video.data) - if len(self.atsc) < len(p_list): - self.atsc = p_list + elif not M3U8Queue.initialized_psi: + p_list = M3U8Queue.atsc_msg.extract_psip(self.video.data) + if len(M3U8Queue.atsc) < len(p_list): + M3U8Queue.atsc = p_list self.channel_dict['atsc'] = p_list - self.initialized_psi = True + M3U8Queue.initialized_psi = True return p_list - if len(self.atsc) == len(p_list): + if len(M3U8Queue.atsc) == len(p_list): for i in range(len(p_list)): - if p_list[i][4:] != self.atsc[i][4:]: - self.atsc = p_list + if p_list[i][4:] != M3U8Queue.atsc[i][4:]: + M3U8Queue.atsc = p_list self.channel_dict['atsc'] = p_list - self.initialized_psi = True + M3U8Queue.initialized_psi = True is_changed = True return p_list return None + def is_pts_valid(self): + if self.pts_validation is None: + return True + results = self.pts_validation.check_pts(self.video) + if results['byteoffset'] != 0: + return False + if results['refresh_stream']: + return False + if results['reread_buffer']: + return False + return True + + def get_stream_from_atsc(self): + if M3U8Queue.atsc is not None: + return M3U8Queue.atsc_msg.format_video_packets(self.atsc) + else: + self.logger.info(''.join([ + 'No ATSC msg available during filtered content, ', + 'recommend running this channel again to catch the ATSC msg.'])) + return M3U8Queue.atsc_msg.format_video_packets() + def process_m3u8_item(self, _queue_item): global IS_VOD global TERMINATE_REQUESTED @@ -209,12 +174,11 @@ def process_m3u8_item(self, _queue_item): uri_dt = _queue_item['uri_dt'] data = _queue_item['data'] if data['filtered']: - out_queue_put({'uri': data['uri'], + PLAY_LIST[uri_dt]['played'] = True + return {'uri': data['uri'], 'data': data, 'stream': self.get_stream_from_atsc(), - 'atsc': None}) - PLAY_LIST[uri_dt]['played'] = True - time.sleep(0.01) + 'atsc': None} else: if IS_VOD: count = self.config['stream']['vod_retries'] @@ -224,83 +188,180 @@ def process_m3u8_item(self, _queue_item): self.video.data = self.get_uri_data(data['uri']) if self.video.data: break + + # TBD WHAT TO DO WITH THIS? out_queue_put({'uri': 'extend', 'data': data, 'stream': None, 'atsc': None}) count -= 1 + if uri_dt not in PLAY_LIST.keys(): + self.logger.debug('{} uri_dt not in PLAY_LIST keys {}'.format(os.getpid(), uri_dt)) return if self.video.data is None: PLAY_LIST[uri_dt]['played'] = True - out_queue_put({'uri': data['uri'], + return {'uri': data['uri'], 'data': data, 'stream': None, 'atsc': None - }) - return + } if not self.decrypt_stream(data): # terminate if stream is not decryptable - out_queue_put({'uri': 'terminate', - 'data': data, - 'stream': None, - 'atsc': None}) TERMINATE_REQUESTED = True - self.pts_resync.terminate() - self.pts_resync = None + M3U8Queue.pts_resync.terminate() + M3U8Queue.pts_resync = None clear_queues() PLAY_LIST[uri_dt]['played'] = True - time.sleep(0.01) - return + return {'uri': 'terminate', + 'data': data, + 'stream': None, + 'atsc': None} if not self.is_pts_valid(): PLAY_LIST[uri_dt]['played'] = True - out_queue_put({'uri': data['uri'], + return {'uri': data['uri'], 'data': data, 'stream': None, 'atsc': None - }) - return + } - if self.first_segment: - self.first_segment = False - self.pts_resync.resequence_pts(self.video) + M3U8Queue.pts_resync.resequence_pts(self.video) if self.video.data is None: - out_queue_put({'uri': data['uri'], + PLAY_LIST[uri_dt]['played'] = True + return{'uri': data['uri'], 'data': data, 'stream': self.video.data, - 'atsc': None}) - PLAY_LIST[uri_dt]['played'] = True - time.sleep(0.01) - return + 'atsc': None} atsc_default_msg = self.atsc_processing() - out_queue_put({'uri': data['uri'], + PLAY_LIST[uri_dt]['played'] = True + return {'uri': data['uri'], 'data': data, 'stream': self.video.data, 'atsc': atsc_default_msg - }) - PLAY_LIST[uri_dt]['played'] = True - time.sleep(0.1) + } - def is_pts_valid(self): - if self.pts_validation is None: - return True - results = self.pts_validation.check_pts(self.video) - if results['byteoffset'] != 0: - return False - if results['refresh_stream']: - return False - if results['reread_buffer']: - return False - return True - def get_stream_from_atsc(self): - if self.atsc is not None: - return self.atsc_msg.format_video_packets(self.atsc) +class M3U8Queue(Thread): + """ + This runs as an independent process (one per stream) to get and process the + data stream as fast as possible and return it to the tuner web server for + output to the client. + """ + is_stuck = None + http_session = httpx.Client(http2=True, verify=False) + http_header = None + key_list = {} + config_section = None + channel_dict = None + pts_resync = None + atsc = None + atsc_msg = None + initialized_psi = False + + def __init__(self, _config, _channel_dict): + Thread.__init__(self) + self.logger = logging.getLogger(__name__ + str(threading.get_ident())) + self.config = _config + self.namespace = _channel_dict['namespace'].lower() + M3U8Queue.config_section = utils.instance_config_section(_channel_dict['namespace'], _channel_dict['instance']) + M3U8Queue.channel_dict = _channel_dict + M3U8Queue.atsc_msg = ATSCMsg() + self.channel_dict = _channel_dict + M3U8Queue.atsc = _channel_dict['atsc'] + if _channel_dict['json'].get('Header') is None: + M3U8Queue.http_header = {'User-agent': utils.DEFAULT_USER_AGENT} else: - self.logger.info(''.join([ - 'No ATSC msg available during filtered content, ', - 'recommend running this channel again to catch the ATSC msg.'])) - return self.atsc_msg.format_video_packets() + M3U8Queue.http_header = _channel_dict['json']['Header'] + if _channel_dict['json'].get('use_date_on_m3u8_key') is None: + self.use_date_on_key = True + else: + self.use_date_on_key = _channel_dict['json']['use_date_on_m3u8_key'] + + M3U8Queue.pts_resync = PTSResync(_config, self.config_section, _channel_dict['uid']) + self.start() + + + def run(self): + global OUT_QUEUE + global STREAM_QUEUE + global TERMINATE_REQUESTED + global UID_COUNTER + global UID_PROCESSED + try: + while not TERMINATE_REQUESTED: + queue_item = STREAM_QUEUE.get() + self.logger.warning('QUEUE SIZE: {}'.format(STREAM_QUEUE.qsize())) + if queue_item['uri_dt'] == 'terminate': + self.logger.debug('Received terminate from internalproxy {}'.format(os.getpid())) + TERMINATE_REQUESTED = True + break + elif queue_item['uri_dt'] == 'status': + out_queue_put({'uri': 'running', + 'data': None, + 'stream': None, + 'atsc': None}) + continue + elif queue_item['uri_dt'] == 'check_processed_list': + self.logger.warning('#### Received check_processed_list {} COUNTER: {} PROCESSED: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED)) + self.check_processed_list() + time.sleep(.1) + continue + + self.logger.warning('**** Received check_processed_list {} COUNTER: {} PROCESSED: {} QUEUE: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS))) + self.check_processed_list() + while UID_COUNTER - UID_PROCESSED > 4: + self.logger.warning('SLOWING PROCESSING DUE TO BACKUP') + time.sleep(.4) + self.check_processed_list() + self.process_queue = M3U8GetUriData(queue_item, UID_COUNTER, self.config) + time.sleep(.1) + + self.logger.warning('thread running for {} COUNTER: {} PROCESSED: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED)) + UID_COUNTER += 1 + except (KeyboardInterrupt, EOFError) as ex: + TERMINATE_REQUESTED = True + clear_queues() + if self.pts_resync is not None: + self.pts_resync.terminate() + self.pts_resync = None + time.sleep(0.01) + sys.exit() + except Exception as ex: + TERMINATE_REQUESTED = True + STREAM_QUEUE.put({'uri_dt': 'terminate'}) + IN_QUEUE.put({'uri': 'terminate'}) + if self.pts_resync is not None: + self.pts_resync.terminate() + self.pts_resync = None + clear_queues() + time.sleep(0.01) + self.logger.exception('{}'.format( + 'UNEXPECTED EXCEPTION M3U8Queue=')) + sys.exit() + # we are terminating so cleanup ffmpeg + if self.pts_resync is not None: + self.pts_resync.terminate() + self.pts_resync = None + time.sleep(0.01) + out_queue_put({'uri': 'terminate', + 'data': None, + 'stream': None, + 'atsc': None}) + time.sleep(0.01) + TERMINATE_REQUESTED = True + self.logger.debug('M3U8Queue terminated {}'.format(os.getpid())) + + + def check_processed_list(self): + global UID_PROCESSED + global PROCESSED_URLS + if len(PROCESSED_URLS) > 0: + first_key = sorted(PROCESSED_URLS.keys())[0] + if first_key == UID_PROCESSED: + self.logger.warning('FIRST KEY={}'.format(first_key)) + out_queue_put(PROCESSED_URLS[first_key]) + del PROCESSED_URLS[first_key] + UID_PROCESSED += 1 class M3U8Process(Thread): diff --git a/tvh_main.py b/tvh_main.py index f1ea303..5e0c686 100644 --- a/tvh_main.py +++ b/tvh_main.py @@ -21,8 +21,8 @@ import sys from inspect import getsourcefile -if sys.version_info.major == 2 or sys.version_info < (3, 7): - print('Error: cabernet requires python 3.7+.') +if sys.version_info.major == 2 or sys.version_info < (3, 8): + print('Error: cabernet requires python 3.8+.') sys.exit(1) from lib import main