diff --git a/multi_crawler/crawlers/youtube_crawler.py b/multi_crawler/crawlers/youtube_crawler.py index c6dfccb..c5fdd54 100644 --- a/multi_crawler/crawlers/youtube_crawler.py +++ b/multi_crawler/crawlers/youtube_crawler.py @@ -2,6 +2,7 @@ import logging import subprocess import time +from concurrent.futures import ThreadPoolExecutor from queue import Queue from threading import Thread from typing import Callable, Sequence @@ -44,8 +45,10 @@ def _create_ytb_dl(self): self.ytb_dl = yt_dlp.YoutubeDL(settings) def _get_po_token(self): - while len(self.po_token_queue.queue) == 0: + if len(self.po_token_queue.queue) == 0: logging.warning("No poo token available. Waiting...") + + while len(self.po_token_queue.queue) == 0: time.sleep(1) return self.po_token_queue.get() @@ -110,12 +113,12 @@ def __init__( isinstance(self._session, TorSession), session=self._session ) + # Create a thread pool with max 10 threads + self.executor = ThreadPoolExecutor(max_workers=10) + self.futures = set() + def _get_ytb_data(self, url): - try: - info = self.ytb_dl_session.extract_info(url) - except Exception as ex: - self.logging.error("Failed to extract info: %s", ex) - return + info = self.ytb_dl_session.extract_info(url) if not "Music" in info["categories"]: logging.info("Skipping non-music video: %s", info["title"]) @@ -132,6 +135,15 @@ def _get_ytb_data(self, url): self._callback(audio) + def _manage_futures(self): + """Helper function to clean up completed futures and maintain a max of 10 threads.""" + # Check if any threads have finished and remove them + completed_futures = {fut for fut in self.futures if fut.done()} + for fut in completed_futures: + fut.result() + + self.futures.remove(fut) + def crawl(self, *args, **kwargs) -> None: """Find and return URLs of Youtube videos based on search terms.""" @@ -151,7 +163,15 @@ def crawl(self, *args, **kwargs) -> None: break # no more results for item in results["items"]: - self._get_ytb_data(f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}") + url = f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}" + + # Ensure we don't have more than 10 active threads + while len(self.futures) >= 10: + self._manage_futures() + time.sleep(0.1) # Sleep briefly to avoid tight loop + + future = self.executor.submit(self._get_ytb_data, url) + self.futures.add(future) results_nbm += 1 diff --git a/smart-ipv6-rotator b/smart-ipv6-rotator deleted file mode 160000 index 8e2587e..0000000 --- a/smart-ipv6-rotator +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8e2587e1da9b049e65323f17e82777680f029fac