Skip to content

Commit

Permalink
improve tor usage to download video
Browse files Browse the repository at this point in the history
  • Loading branch information
Jourdelune committed Sep 28, 2024
1 parent 2a51440 commit b802816
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 164 deletions.
16 changes: 2 additions & 14 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
import argparse
import logging
import os

from dotenv import load_dotenv

from multi_crawler import (
ArchiveCrawler,
CSVExporter,
Session,
TorSession,
YoutubeCrawler,
)
from multi_crawler import ArchiveCrawler, CSVExporter, YoutubeCrawler

logging.basicConfig(level=logging.INFO)
load_dotenv(override=True)
Expand Down Expand Up @@ -62,14 +55,9 @@
line = line.strip()
logging.info("Processing line: %s", line)

session = (
TorSession(os.getenv("TOR_PASSWORD")) if args.tor_proxy else Session()
)
if line.startswith("youtube:"):
crawlers = YoutubeCrawler(
line.split(" ", 1)[1],
callback=exporter,
session=session,
line.split(" ", 1)[1], callback=exporter, num_processes=5
)
else:
crawlers = ArchiveCrawler(line.split(" ", 1)[1], callback=exporter)
Expand Down
1 change: 0 additions & 1 deletion multi_crawler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from .crawlers import *
from .exports import *
from .session import *
46 changes: 27 additions & 19 deletions multi_crawler/crawlers/youtube_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Sequence

import requests

from ..models import Audio
from ..session import Session, TorSession
from ..ytb_session import YtbSession
from .crawlers import BaseCrawler
from ..ytb_session import YtbDLSession


class YoutubeCrawler(BaseCrawler):
Expand All @@ -19,30 +20,39 @@ def __init__(
self,
terms: Sequence[str],
callback: Callable,
session: Session = Session,
process: bool = False,
num_processes: int = 10,
):
self._terms = terms
self._callback = callback
self._session = session
self._process = process
self._num_processes = num_processes

self.logging = logging.getLogger(__name__)

self._nbm_requests = 0

self.ytb_dl_session = YtbDLSession(
isinstance(self._session, TorSession), session=self._session
)
self._ytb_sessions = {
time.time(): YtbSession(
{"quiet": True, "noprogress": True, "no_warnings": True}
)
for _ in range(num_processes)
}

# Create a thread pool with max 10 threads
self.executor = ThreadPoolExecutor(max_workers=10)
self.executor = ThreadPoolExecutor(max_workers=num_processes)
self.futures = set()

def _get_ytb_data(self, url):
info = self.ytb_dl_session.extract_info(url)

if not "Music" in info["categories"]:
# get the oldest session
session = self._ytb_sessions.pop(min(self._ytb_sessions.keys()))
# append a new session
self._ytb_sessions[time.time()] = session

info = session.extract_info(url, download=False)

if (
"categories" in info
and info["categories"] is not None
and not "Music" in info["categories"]
):
logging.info("Skipping non-music video: %s", info["title"])
return

Expand Down Expand Up @@ -88,7 +98,7 @@ def crawl(self, *args, **kwargs) -> None:
url = f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}"

# Ensure we don't have more than 10 active threads
while len(self.futures) >= 10:
while len(self.futures) > self._num_processes:
self._manage_futures()
time.sleep(0.1) # Sleep briefly to avoid tight loop

Expand All @@ -110,7 +120,7 @@ def _get_youtube_init_data(self, url):
context = None

try:
response = self._session.get_session().get(url)
response = requests.get(url)
page_content = response.text

yt_init_data = page_content.split("var ytInitialData =")
Expand Down Expand Up @@ -189,9 +199,7 @@ def _next_page(self, next_page):
endpoint = f"{self.YOUTUBE_ENDPOINT}/youtubei/v1/search?key={next_page['nextPageToken']}"

try:
response = self._session.get_session().post(
endpoint, json=next_page["nextPageContext"]
)
response = requests.post(endpoint, json=next_page["nextPageContext"])
page_data = response.json()

item1 = page_data["onResponseReceivedCommands"][0][
Expand Down
27 changes: 25 additions & 2 deletions multi_crawler/exports/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import csv
import os
from typing import List
from typing import Any

from ..models import Audio

Expand All @@ -23,6 +23,25 @@ def __init__(self, filename: str, overwrite: bool = False):
writer = csv.writer(f)
writer.writerow(self._columns)

def _clean_value(self, value: Any) -> Any:
"""Method to clean the value before writing it to the CSV file.
Args:
value (Any): the value to clean
Returns:
Any: the cleaned value
"""

if isinstance(value, str):
return value.replace("\n", " ")

if isinstance(value, list):
value = ", ".join(value)
value = value.replace("\n", " ")

return value

def __call__(self, audio: Audio):
"""Write the information of the audio to the CSV file.
Expand All @@ -36,7 +55,11 @@ def __call__(self, audio: Audio):
# Write the values of the audio object to the CSV file
writer.writerow(
[
"" if getattr(audio, field) is None else getattr(audio, field)
(
""
if getattr(audio, field) is None
else self._clean_value(getattr(audio, field))
)
for field in self._columns
]
)
42 changes: 0 additions & 42 deletions multi_crawler/session.py

This file was deleted.

157 changes: 71 additions & 86 deletions multi_crawler/ytb_session.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,79 @@
"""
Wrapper for downloading videos from Youtube using Tor as a proxy.
"""

import logging
import subprocess
import time
from queue import Queue
from threading import Thread
import random
from typing import Any

import yt_dlp
from yt_dlp.utils import DownloadError


class YtbDLSession:
"""
Class to manage a YouTube-DL session with a pool of tokens.
"""

def __init__(self, is_tor_session: bool, po_token_reserve: int = 5, session=None):
self.is_tor_session = is_tor_session
self.po_token_queue = Queue()
self.ytb_dl = None
self.po_token_reserve = po_token_reserve
self.token_generator_thread = Thread(
target=self._token_generator_worker, daemon=True
)
self._session = session
self.token_generator_thread.start()

def _token_generator_worker(self):
while True:
if self.po_token_queue.qsize() < self.po_token_reserve:
new_token = self._generate_poo()
self.po_token_queue.put(new_token)
else:
time.sleep(1) # Sleep to avoid constant CPU usage

def _create_ytb_dl(self):
settings = {
"proxy": "socks5://127.0.0.1:9050" if self.is_tor_session else None,
"po_token": f"web+{self._get_po_token()}",
"nocheckcertificate": True,
"quiet": True,
"noprogress": True,
}

self.ytb_dl = yt_dlp.YoutubeDL(settings)

def _get_po_token(self):
if len(self.po_token_queue.queue) == 0:
logging.warning("No poo token available. Waiting...")

while len(self.po_token_queue.queue) == 0:
time.sleep(1)

return self.po_token_queue.get()

def _generate_poo(self):
logging.info("Generating poo token")
result = subprocess.run(
["./multi_crawler/scripts/poo_gen.sh"],
capture_output=True,
text=True,
check=True,
)

result = result.stdout.strip()

if "warning" in result:
logging.warning("Failed to generate poo token. Retrying...")
return self._generate_poo()

poo_token = result.split("po_token: ")[1].split("\n")[0]
logging.info("Generated poo token: %s", poo_token[:10] + "...")
return poo_token.strip()

def extract_info(self, url) -> dict:
"""
Extracts info from a given URL using the YouTube-DL session.
args:
url: URL to extract info from.
returns:
dict: Extracted info from the URL.
"""
class YtbSession:
"""Wrapper class for YoutubeDL that uses Tor as a proxy."""

def __init__(self, params: dict = None, **kwargs):
"""Initializes the TorWrapper with optional parameters.
logging.info("Extracting info from %s", url)
if self.ytb_dl is None:
self._create_ytb_dl()
Args:
params (dict, optional): Optional parameters for YoutubeDL. Defaults to None.
"""
self.params = params if params is not None else {}
self.kwargs = kwargs
self._init_ytdl()

def _gen_proxy(self) -> str:
"""Generates a random proxy string using Tor."""
creds = str(random.randint(10000, 10**9)) + ":" + "foobar"
return f"socks5://{creds}@127.0.0.1:9050"

def _init_ytdl(self):
"""Initializes or reinitializes the YoutubeDL instance with a new proxy."""
# Set a new proxy for each initialization
self.params["proxy"] = self._gen_proxy()
self.ytdl = yt_dlp.YoutubeDL(self.params, **self.kwargs)
logging.info("Initialized YoutubeDL with proxy %s", self.params["proxy"])

def _handle_download_error(self, method_name: str, *args, **kwargs) -> Any:
"""Handles DownloadError by reinitializing and retrying the method call.
Args:
method_name (str): The name of the method to call.
Returns:
any: The return value of the method call.
"""
method = getattr(self.ytdl, method_name)
try:
return self.ytb_dl.extract_info(url, download=False)
except yt_dlp.utils.DownloadError:
logging.warning("YouTube bot detection triggered. Updating session...")
if self.is_tor_session:
self._session.renew_connection()

self._create_ytb_dl()
return self.extract_info(url)
return method(*args, **kwargs)
except DownloadError:
logging.warning(
"DownloadError in %s, reinitializing with new proxy...", method_name
)
self._init_ytdl()
return self._handle_download_error(method_name, *args, **kwargs)

def extract_info(self, *args, **kwargs):
"""Extracts information and handles DownloadError by reinitializing YoutubeDL."""
return self._handle_download_error("extract_info", *args, **kwargs)

def download(self, *args, **kwargs):
"""Downloads a video and handles DownloadError by reinitializing YoutubeDL."""
return self._handle_download_error("download", *args, **kwargs)

def download_with_info_file(self, *args, **kwargs):
"""Downloads a video with an info file, handles DownloadError by reinitializing."""
return self._handle_download_error("download_with_info_file", *args, **kwargs)

def __getattr__(self, name: str) -> Any:
"""Redirects attribute access to the YoutubeDL instance.
Args:
name (str): The name of the attribute to access.
Returns:
any: The attribute value.
"""
return getattr(self.ytdl, name)

0 comments on commit b802816

Please sign in to comment.