From 29afc060a4880e34b768767a6648e30d371889b0 Mon Sep 17 00:00:00 2001 From: Jourdelune Date: Sun, 29 Sep 2024 11:52:39 +0200 Subject: [PATCH] Use pytub --- README.md | 2 +- main.py | 13 +- multi_crawler/crawlers/youtube_crawler.py | 242 ++-------------------- multi_crawler/ytb_session.py | 112 ---------- test.py | 20 -- 5 files changed, 17 insertions(+), 372 deletions(-) delete mode 100644 multi_crawler/ytb_session.py delete mode 100644 test.py diff --git a/README.md b/README.md index 7ae7c99..262bec1 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ pip install -r requirements.txt Run the crawler ```bash -python main.py --csv --input FILE.txt --overwrite --file_name FILE.csv --num_processes 40 +python main.py --csv --input FILE.txt --overwrite --file_name FILE.csv ``` ## License diff --git a/main.py b/main.py index 3335b12..a2f46ee 100644 --- a/main.py +++ b/main.py @@ -45,13 +45,6 @@ help="Use Tor proxy to make requests on youtube", default=False, ) - argparser.add_argument( - "--num_processes", - type=int, - help="Number of processes to use for crawling", - default=40, - required=False, - ) args = argparser.parse_args() if args.csv and args.file_name is None: @@ -65,11 +58,7 @@ logging.info("Processing line: %s", line) if line.startswith("youtube:"): - crawlers = YoutubeCrawler( - line.split(" ", 1)[1], - callback=exporter, - num_processes=args.num_processes, - ) + crawlers = YoutubeCrawler(line.split(" ", 1)[1], callback=exporter) else: crawlers = ArchiveCrawler(line.split(" ", 1)[1], callback=exporter) crawlers.crawl() diff --git a/multi_crawler/crawlers/youtube_crawler.py b/multi_crawler/crawlers/youtube_crawler.py index 479197c..97191fd 100644 --- a/multi_crawler/crawlers/youtube_crawler.py +++ b/multi_crawler/crawlers/youtube_crawler.py @@ -1,258 +1,46 @@ -import json import logging -import time -from concurrent.futures import ThreadPoolExecutor from typing import Callable, Sequence -import requests +from pytubefix import Search, YouTube from ..models import Audio -from ..ytb_session import YtbSession from .crawlers import BaseCrawler class YoutubeCrawler(BaseCrawler): """Class to find and return URLs of Youtube videos based on search terms.""" - YOUTUBE_ENDPOINT = "https://www.youtube.com" - def __init__( self, terms: Sequence[str], callback: Callable, process: bool = False, - num_processes: int = 10, ): self._terms = terms self._callback = callback self._process = process - self._num_processes = num_processes self.logging = logging.getLogger(__name__) - self._ytb_sessions = { - time.time(): YtbSession( - {"quiet": True, "noprogress": True, "no_warnings": True} - ) - for _ in range(num_processes) - } - - # Create a thread pool with max 10 threads - self.executor = ThreadPoolExecutor(max_workers=num_processes) - self.futures = set() - - def _get_ytb_data(self, url): - # get the oldest session - session = self._ytb_sessions.pop(min(self._ytb_sessions.keys())) - # append a new session - self._ytb_sessions[time.time()] = session + self._search = Search(terms, {"params": "EgIwAQ%3D%3D"}) - try: - info = session.extract_info(url, download=False) - except Exception as e: - logging.error("Error extracting info from %s: %s", url, e) - return - - logging.info("Found music video: %s", info["title"]) + def _get_ytb_data(self, video: YouTube) -> None: + logging.info("Found music video: %s", video.title) audio = Audio( - url=url, - title=info["title"], - author=info["channel"], - description=info["description"], - tags=info["tags"], + url=video.watch_url, + title=video.title, + author=video.author, + description=video.description, + tags=video.keywords, ) self._callback(audio) - def _manage_futures(self): - """Helper function to clean up completed futures and maintain a max of 10 threads.""" - # Check if any threads have finished and remove them - completed_futures = {fut for fut in self.futures if fut.done()} - for fut in completed_futures: - fut.result() - - self.futures.remove(fut) - def crawl(self, *args, **kwargs) -> None: """Find and return URLs of Youtube videos based on search terms.""" - nb_results = kwargs.get("nb_results", -1) - - results_nbm = 0 - next_page_token = None - - while results_nbm < nb_results or nb_results == -1: - if next_page_token is None: - results = self._get_list_by_keyword(self._terms) - else: - results = self._next_page(next_page_token) - - if "items" in results: - if len(results["items"]) == 0: - break # no more results - - for item in results["items"]: - url = f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}" - - # Ensure we don't have more than 10 active threads - while len(self.futures) > self._num_processes: - self._manage_futures() - time.sleep(0.1) # Sleep briefly to avoid tight loop - - future = self.executor.submit(self._get_ytb_data, url) - self.futures.add(future) - - results_nbm += 1 - - if "nextPage" in results: - next_page_token = results["nextPage"] - else: - break - - self.logging.info("Found %d url.", results_nbm) - - def _get_youtube_init_data(self, url): - init_data = {} - api_token = None - context = None - - try: - response = requests.get(url, timeout=10) - page_content = response.text - - yt_init_data = page_content.split("var ytInitialData =") - - if yt_init_data and len(yt_init_data) > 1: - data = yt_init_data[1].split("")[0].strip()[:-1] - - if "innertubeApiKey" in page_content: - api_token = ( - page_content.split("innertubeApiKey")[1] - .strip() - .split(",")[0] - .split('"')[2] - ) - - if "INNERTUBE_CONTEXT" in page_content: - context = json.loads( - page_content.split("INNERTUBE_CONTEXT")[1].strip()[2:-2] - ) - - init_data = json.loads(data) - return { - "initdata": init_data, - "apiToken": api_token, - "context": context, - } - else: - print("cannot_get_init_data") - raise Exception("Cannot get init data") - - except Exception as ex: - print(ex) - raise ex - - def _get_list_by_keyword(self, keyword): - endpoint = f"{self.YOUTUBE_ENDPOINT}/results?search_query={keyword}&sp=EgIwAQ%253D%253D" - - try: - page = self._get_youtube_init_data(endpoint) - - section_list_renderer = page["initdata"]["contents"][ - "twoColumnSearchResultsRenderer" - ]["primaryContents"]["sectionListRenderer"] - - cont_token = {} - items = [] - - for content in section_list_renderer["contents"]: - if "continuationItemRenderer" in content: - cont_token = content["continuationItemRenderer"][ - "continuationEndpoint" - ]["continuationCommand"]["token"] - elif "itemSectionRenderer" in content: - for item in content["itemSectionRenderer"]["contents"]: - video_render = item.get("videoRenderer") - if video_render and "videoId" in video_render: - items.append(self._video_render_func(item)) - - api_token = page["apiToken"] - context = page["context"] - next_page_context = {"context": context, "continuation": cont_token} - - return { - "items": items, - "nextPage": { - "nextPageToken": api_token, - "nextPageContext": next_page_context, - }, - } - - except Exception as ex: - print(ex) - raise ex - - def _next_page(self, next_page): - endpoint = f"{self.YOUTUBE_ENDPOINT}/youtubei/v1/search?key={next_page['nextPageToken']}" - - try: - response = requests.post( - endpoint, json=next_page["nextPageContext"], timeout=10 - ) - page_data = response.json() - - item1 = page_data["onResponseReceivedCommands"][0][ - "appendContinuationItemsAction" - ] - items = [] - - for conitem in item1["continuationItems"]: - if "itemSectionRenderer" in conitem: - for item in conitem["itemSectionRenderer"]["contents"]: - video_render = item.get("videoRenderer") - if video_render and "videoId" in video_render: - items.append(self._video_render_func(item)) - elif "continuationItemRenderer" in conitem: - next_page["nextPageContext"]["continuation"] = conitem[ - "continuationItemRenderer" - ]["continuationEndpoint"]["continuationCommand"]["token"] - - return {"items": items, "nextPage": next_page} - - except Exception as ex: - print(ex) - raise ex - - def _video_render_func(self, json_data): - try: - if json_data and "videoRenderer" in json_data: - video_renderer = json_data["videoRenderer"] - - is_live = False - if "badges" in video_renderer and video_renderer["badges"]: - badge = video_renderer["badges"][0].get("metadataBadgeRenderer") - if badge and badge.get("style") == "BADGE_STYLE_TYPE_LIVE_NOW": - is_live = True - - if "thumbnailOverlays" in video_renderer: - for item in video_renderer["thumbnailOverlays"]: - overlay = item.get("thumbnailOverlayTimeStatusRenderer") - if overlay and overlay.get("style") == "LIVE": - is_live = True - - return { - "id": video_renderer["videoId"], - "type": "video", - "thumbnail": video_renderer["thumbnail"], - "title": video_renderer["title"]["runs"][0]["text"], - "channelTitle": video_renderer.get("ownerText", {}) - .get("runs", [{}])[0] - .get("text", ""), - "shortBylineText": video_renderer.get("shortBylineText", ""), - "length": video_renderer.get("lengthText", ""), - "isLive": is_live, - } - else: - return {} - - except Exception as ex: - raise ex + last_nbm_results = 0 + while len(self._search.videos) > last_nbm_results: + for result in self._search.videos[last_nbm_results:]: + self._get_ytb_data(result) + last_nbm_results = len(self._search.videos) + self._search.get_next_results() diff --git a/multi_crawler/ytb_session.py b/multi_crawler/ytb_session.py deleted file mode 100644 index 0358698..0000000 --- a/multi_crawler/ytb_session.py +++ /dev/null @@ -1,112 +0,0 @@ -import logging -import random -from typing import Any - -import yt_dlp -from yt_dlp.utils import DownloadError - -# create a logger for the module with the module name -logger = logging.getLogger(__name__) - - -class SilentLogger: - """Silent logger class that does not log anything to avoid ram usage.""" - - def debug(self, msg): - pass - - def info(self, msg): - pass - - def warning(self, msg): - pass - - def error(self, msg): - pass - - -class YtbSession: - """Wrapper class for YoutubeDL that uses Tor as a proxy.""" - - def __init__(self, params: dict = None, max_attemps: int = -1, **kwargs): - """Initializes the TorWrapper with optional parameters. - - Args: - params (dict, optional): Optional parameters for YoutubeDL. Defaults to None. - max_attemps (int, optional): Maximum number of attempts to retry a failed download. Defaults to -1 (infinite). - - """ - self.params = params if params is not None else {} - self.kwargs = kwargs - self._max_attempts = max_attemps - - self._init_ytdl() - - def _gen_proxy(self) -> str: - """Generates a random proxy string using Tor.""" - creds = str(random.randint(10000, 10**9)) + ":" + "foobar" - return f"socks5://{creds}@127.0.0.1:9050" - - def _init_ytdl(self): - """Initializes or reinitializes the YoutubeDL instance with a new proxy.""" - # Set a new proxy for each initialization - self.params["proxy"] = self._gen_proxy() - self.params["logger"] = SilentLogger() - self.ytdl = yt_dlp.YoutubeDL(self.params, **self.kwargs) - logger.info("Initialized YoutubeDL with proxy %s", self.params["proxy"]) - - def _handle_download_error(self, method_name: str, *args, **kwargs) -> Any: - """Handles DownloadError by reinitializing and retrying the method call in a loop. - - Args: - method_name (str): The name of the method to call. - - Returns: - any: The return value of the method call or raises the error if unrecoverable. - """ - - attempt = 0 - - while attempt < self._max_attempts or self._max_attempts == -1: - try: - method = getattr(self.ytdl, method_name) - return method(*args, **kwargs) - except DownloadError as e: - if ( - "sign in" in str(e).lower() - or "failed to extract any player response" in str(e).lower() - ): - logger.warning( - "DownloadError in %s, reinitializing with new proxy... Attempt %d", - method_name, - attempt + 1, - ) - attempt += 1 - self._init_ytdl() - else: - raise e - # If maximum attempts exceeded, raise DownloadError - raise DownloadError(f"Failed after {attempt} attempts") - - def extract_info(self, *args, **kwargs): - """Extracts information and handles DownloadError by reinitializing YoutubeDL.""" - return self._handle_download_error("extract_info", *args, **kwargs) - - def download(self, *args, **kwargs): - """Downloads a video and handles DownloadError by reinitializing YoutubeDL.""" - return self._handle_download_error("download", *args, **kwargs) - - def download_with_info_file(self, *args, **kwargs): - """Downloads a video with an info file, handles DownloadError by reinitializing.""" - return self._handle_download_error("download_with_info_file", *args, **kwargs) - - def __getattr__(self, name: str) -> Any: - """Redirects attribute access to the YoutubeDL instance. - - Args: - name (str): The name of the attribute to access. - - Returns: - any: The attribute value. - """ - return getattr(self.ytdl, name) diff --git a/test.py b/test.py deleted file mode 100644 index f404586..0000000 --- a/test.py +++ /dev/null @@ -1,20 +0,0 @@ -import requests - - -def get_tor_session(): - session = requests.session() - # Tor uses the 9050 port as the default socks port - session.proxies = { - "http": "socks5://127.0.0.1:9050", - } - return session - - -# Make a request through the Tor connection -# IP visible through Tor -session = get_tor_session() -print(session.get("http://httpbin.org/ip").text) -# Above should print an IP different than your public IP - -# Following prints your normal public IP -print(requests.get("http://httpbin.org/ip").text)