From b802816b5ad00ed3dd9362da7d5f9a93f6526608 Mon Sep 17 00:00:00 2001 From: Jourdelune Date: Sat, 28 Sep 2024 17:23:36 +0200 Subject: [PATCH] improve tor usage to download video --- main.py | 16 +-- multi_crawler/__init__.py | 1 - multi_crawler/crawlers/youtube_crawler.py | 46 ++++--- multi_crawler/exports/csv_exporter.py | 27 +++- multi_crawler/session.py | 42 ------ multi_crawler/ytb_session.py | 157 ++++++++++------------ 6 files changed, 125 insertions(+), 164 deletions(-) delete mode 100644 multi_crawler/session.py diff --git a/main.py b/main.py index 705e430..d6a02eb 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,9 @@ import argparse import logging -import os from dotenv import load_dotenv -from multi_crawler import ( - ArchiveCrawler, - CSVExporter, - Session, - TorSession, - YoutubeCrawler, -) +from multi_crawler import ArchiveCrawler, CSVExporter, YoutubeCrawler logging.basicConfig(level=logging.INFO) load_dotenv(override=True) @@ -62,14 +55,9 @@ line = line.strip() logging.info("Processing line: %s", line) - session = ( - TorSession(os.getenv("TOR_PASSWORD")) if args.tor_proxy else Session() - ) if line.startswith("youtube:"): crawlers = YoutubeCrawler( - line.split(" ", 1)[1], - callback=exporter, - session=session, + line.split(" ", 1)[1], callback=exporter, num_processes=5 ) else: crawlers = ArchiveCrawler(line.split(" ", 1)[1], callback=exporter) diff --git a/multi_crawler/__init__.py b/multi_crawler/__init__.py index 6b727ae..70d282c 100644 --- a/multi_crawler/__init__.py +++ b/multi_crawler/__init__.py @@ -1,3 +1,2 @@ from .crawlers import * from .exports import * -from .session import * diff --git a/multi_crawler/crawlers/youtube_crawler.py b/multi_crawler/crawlers/youtube_crawler.py index 3663300..351b2f8 100644 --- a/multi_crawler/crawlers/youtube_crawler.py +++ b/multi_crawler/crawlers/youtube_crawler.py @@ -4,10 +4,11 @@ from concurrent.futures import ThreadPoolExecutor from typing import Callable, Sequence +import requests + from ..models import Audio -from ..session import Session, TorSession +from ..ytb_session import YtbSession from .crawlers import BaseCrawler -from ..ytb_session import YtbDLSession class YoutubeCrawler(BaseCrawler): @@ -19,30 +20,39 @@ def __init__( self, terms: Sequence[str], callback: Callable, - session: Session = Session, process: bool = False, + num_processes: int = 10, ): self._terms = terms self._callback = callback - self._session = session self._process = process + self._num_processes = num_processes self.logging = logging.getLogger(__name__) - - self._nbm_requests = 0 - - self.ytb_dl_session = YtbDLSession( - isinstance(self._session, TorSession), session=self._session - ) + self._ytb_sessions = { + time.time(): YtbSession( + {"quiet": True, "noprogress": True, "no_warnings": True} + ) + for _ in range(num_processes) + } # Create a thread pool with max 10 threads - self.executor = ThreadPoolExecutor(max_workers=10) + self.executor = ThreadPoolExecutor(max_workers=num_processes) self.futures = set() def _get_ytb_data(self, url): - info = self.ytb_dl_session.extract_info(url) - - if not "Music" in info["categories"]: + # get the oldest session + session = self._ytb_sessions.pop(min(self._ytb_sessions.keys())) + # append a new session + self._ytb_sessions[time.time()] = session + + info = session.extract_info(url, download=False) + + if ( + "categories" in info + and info["categories"] is not None + and not "Music" in info["categories"] + ): logging.info("Skipping non-music video: %s", info["title"]) return @@ -88,7 +98,7 @@ def crawl(self, *args, **kwargs) -> None: url = f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}" # Ensure we don't have more than 10 active threads - while len(self.futures) >= 10: + while len(self.futures) > self._num_processes: self._manage_futures() time.sleep(0.1) # Sleep briefly to avoid tight loop @@ -110,7 +120,7 @@ def _get_youtube_init_data(self, url): context = None try: - response = self._session.get_session().get(url) + response = requests.get(url) page_content = response.text yt_init_data = page_content.split("var ytInitialData =") @@ -189,9 +199,7 @@ def _next_page(self, next_page): endpoint = f"{self.YOUTUBE_ENDPOINT}/youtubei/v1/search?key={next_page['nextPageToken']}" try: - response = self._session.get_session().post( - endpoint, json=next_page["nextPageContext"] - ) + response = requests.post(endpoint, json=next_page["nextPageContext"]) page_data = response.json() item1 = page_data["onResponseReceivedCommands"][0][ diff --git a/multi_crawler/exports/csv_exporter.py b/multi_crawler/exports/csv_exporter.py index 7907ec8..71084b6 100644 --- a/multi_crawler/exports/csv_exporter.py +++ b/multi_crawler/exports/csv_exporter.py @@ -5,7 +5,7 @@ import csv import os -from typing import List +from typing import Any from ..models import Audio @@ -23,6 +23,25 @@ def __init__(self, filename: str, overwrite: bool = False): writer = csv.writer(f) writer.writerow(self._columns) + def _clean_value(self, value: Any) -> Any: + """Method to clean the value before writing it to the CSV file. + + Args: + value (Any): the value to clean + + Returns: + Any: the cleaned value + """ + + if isinstance(value, str): + return value.replace("\n", " ") + + if isinstance(value, list): + value = ", ".join(value) + value = value.replace("\n", " ") + + return value + def __call__(self, audio: Audio): """Write the information of the audio to the CSV file. @@ -36,7 +55,11 @@ def __call__(self, audio: Audio): # Write the values of the audio object to the CSV file writer.writerow( [ - "" if getattr(audio, field) is None else getattr(audio, field) + ( + "" + if getattr(audio, field) is None + else self._clean_value(getattr(audio, field)) + ) for field in self._columns ] ) diff --git a/multi_crawler/session.py b/multi_crawler/session.py deleted file mode 100644 index 79f951e..0000000 --- a/multi_crawler/session.py +++ /dev/null @@ -1,42 +0,0 @@ -import logging - -import requests -from requests import Session -from stem import Signal -from stem.control import Controller - -logger = logging.getLogger("stem") -logger.disabled = True - - -class Session: - """ - Create a new session object. - """ - - def get_session(self) -> Session: - return requests.Session() - - -class TorSession(Session): - """Create a new session object with Tor proxy settings.""" - - def __init__(self, pswd: str) -> None: - super().__init__() - self.__pswd = pswd - - def renew_connection(self) -> None: - """Renew the Tor connection.""" - with Controller.from_port(port=9051) as controller: - controller.authenticate(password=self.__pswd) - controller.signal(Signal.NEWNYM) - - def get_session(self) -> Session: - """Get a new session object with Tor proxy settings.""" - session = requests.Session() - session.proxies = { - "http": "socks5://127.0.0.1:9050", - "https": "socks5://127.0.0.1:9050", - } - - return session diff --git a/multi_crawler/ytb_session.py b/multi_crawler/ytb_session.py index ff2854e..f1c7b91 100644 --- a/multi_crawler/ytb_session.py +++ b/multi_crawler/ytb_session.py @@ -1,94 +1,79 @@ +""" +Wrapper for downloading videos from Youtube using Tor as a proxy. +""" + import logging -import subprocess -import time -from queue import Queue -from threading import Thread +import random +from typing import Any import yt_dlp +from yt_dlp.utils import DownloadError -class YtbDLSession: - """ - Class to manage a YouTube-DL session with a pool of tokens. - """ - - def __init__(self, is_tor_session: bool, po_token_reserve: int = 5, session=None): - self.is_tor_session = is_tor_session - self.po_token_queue = Queue() - self.ytb_dl = None - self.po_token_reserve = po_token_reserve - self.token_generator_thread = Thread( - target=self._token_generator_worker, daemon=True - ) - self._session = session - self.token_generator_thread.start() - - def _token_generator_worker(self): - while True: - if self.po_token_queue.qsize() < self.po_token_reserve: - new_token = self._generate_poo() - self.po_token_queue.put(new_token) - else: - time.sleep(1) # Sleep to avoid constant CPU usage - - def _create_ytb_dl(self): - settings = { - "proxy": "socks5://127.0.0.1:9050" if self.is_tor_session else None, - "po_token": f"web+{self._get_po_token()}", - "nocheckcertificate": True, - "quiet": True, - "noprogress": True, - } - - self.ytb_dl = yt_dlp.YoutubeDL(settings) - - def _get_po_token(self): - if len(self.po_token_queue.queue) == 0: - logging.warning("No poo token available. Waiting...") - - while len(self.po_token_queue.queue) == 0: - time.sleep(1) - - return self.po_token_queue.get() - - def _generate_poo(self): - logging.info("Generating poo token") - result = subprocess.run( - ["./multi_crawler/scripts/poo_gen.sh"], - capture_output=True, - text=True, - check=True, - ) - - result = result.stdout.strip() - - if "warning" in result: - logging.warning("Failed to generate poo token. Retrying...") - return self._generate_poo() - - poo_token = result.split("po_token: ")[1].split("\n")[0] - logging.info("Generated poo token: %s", poo_token[:10] + "...") - return poo_token.strip() - - def extract_info(self, url) -> dict: - """ - Extracts info from a given URL using the YouTube-DL session. - args: - url: URL to extract info from. - returns: - dict: Extracted info from the URL. - """ +class YtbSession: + """Wrapper class for YoutubeDL that uses Tor as a proxy.""" + + def __init__(self, params: dict = None, **kwargs): + """Initializes the TorWrapper with optional parameters. - logging.info("Extracting info from %s", url) - if self.ytb_dl is None: - self._create_ytb_dl() + Args: + params (dict, optional): Optional parameters for YoutubeDL. Defaults to None. + """ + self.params = params if params is not None else {} + self.kwargs = kwargs + self._init_ytdl() + + def _gen_proxy(self) -> str: + """Generates a random proxy string using Tor.""" + creds = str(random.randint(10000, 10**9)) + ":" + "foobar" + return f"socks5://{creds}@127.0.0.1:9050" + + def _init_ytdl(self): + """Initializes or reinitializes the YoutubeDL instance with a new proxy.""" + # Set a new proxy for each initialization + self.params["proxy"] = self._gen_proxy() + self.ytdl = yt_dlp.YoutubeDL(self.params, **self.kwargs) + logging.info("Initialized YoutubeDL with proxy %s", self.params["proxy"]) + + def _handle_download_error(self, method_name: str, *args, **kwargs) -> Any: + """Handles DownloadError by reinitializing and retrying the method call. + + Args: + method_name (str): The name of the method to call. + + Returns: + any: The return value of the method call. + """ + method = getattr(self.ytdl, method_name) try: - return self.ytb_dl.extract_info(url, download=False) - except yt_dlp.utils.DownloadError: - logging.warning("YouTube bot detection triggered. Updating session...") - if self.is_tor_session: - self._session.renew_connection() - - self._create_ytb_dl() - return self.extract_info(url) + return method(*args, **kwargs) + except DownloadError: + logging.warning( + "DownloadError in %s, reinitializing with new proxy...", method_name + ) + self._init_ytdl() + return self._handle_download_error(method_name, *args, **kwargs) + + def extract_info(self, *args, **kwargs): + """Extracts information and handles DownloadError by reinitializing YoutubeDL.""" + return self._handle_download_error("extract_info", *args, **kwargs) + + def download(self, *args, **kwargs): + """Downloads a video and handles DownloadError by reinitializing YoutubeDL.""" + return self._handle_download_error("download", *args, **kwargs) + + def download_with_info_file(self, *args, **kwargs): + """Downloads a video with an info file, handles DownloadError by reinitializing.""" + return self._handle_download_error("download_with_info_file", *args, **kwargs) + + def __getattr__(self, name: str) -> Any: + """Redirects attribute access to the YoutubeDL instance. + + Args: + name (str): The name of the attribute to access. + + Returns: + any: The attribute value. + """ + return getattr(self.ytdl, name)