Skip to content

Commit

Permalink
post mulit request in the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
Jourdelune committed Sep 26, 2024
1 parent aa3cb7f commit a6abfb9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
34 changes: 27 additions & 7 deletions multi_crawler/crawlers/youtube_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from threading import Thread
from typing import Callable, Sequence
Expand Down Expand Up @@ -44,8 +45,10 @@ def _create_ytb_dl(self):
self.ytb_dl = yt_dlp.YoutubeDL(settings)

def _get_po_token(self):
while len(self.po_token_queue.queue) == 0:
if len(self.po_token_queue.queue) == 0:
logging.warning("No poo token available. Waiting...")

while len(self.po_token_queue.queue) == 0:
time.sleep(1)

return self.po_token_queue.get()
Expand Down Expand Up @@ -110,12 +113,12 @@ def __init__(
isinstance(self._session, TorSession), session=self._session
)

# Create a thread pool with max 10 threads
self.executor = ThreadPoolExecutor(max_workers=10)
self.futures = set()

def _get_ytb_data(self, url):
try:
info = self.ytb_dl_session.extract_info(url)
except Exception as ex:
self.logging.error("Failed to extract info: %s", ex)
return
info = self.ytb_dl_session.extract_info(url)

if not "Music" in info["categories"]:
logging.info("Skipping non-music video: %s", info["title"])
Expand All @@ -132,6 +135,15 @@ def _get_ytb_data(self, url):

self._callback(audio)

def _manage_futures(self):
"""Helper function to clean up completed futures and maintain a max of 10 threads."""
# Check if any threads have finished and remove them
completed_futures = {fut for fut in self.futures if fut.done()}
for fut in completed_futures:
fut.result()

self.futures.remove(fut)

def crawl(self, *args, **kwargs) -> None:
"""Find and return URLs of Youtube videos based on search terms."""

Expand All @@ -151,7 +163,15 @@ def crawl(self, *args, **kwargs) -> None:
break # no more results

for item in results["items"]:
self._get_ytb_data(f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}")
url = f"{self.YOUTUBE_ENDPOINT}/watch?v={item['id']}"

# Ensure we don't have more than 10 active threads
while len(self.futures) >= 10:
self._manage_futures()
time.sleep(0.1) # Sleep briefly to avoid tight loop

future = self.executor.submit(self._get_ytb_data, url)
self.futures.add(future)

results_nbm += 1

Expand Down
1 change: 0 additions & 1 deletion smart-ipv6-rotator
Submodule smart-ipv6-rotator deleted from 8e2587

0 comments on commit a6abfb9

Please sign in to comment.