diff --git a/multi_crawler/crawlers/web_archive.py b/multi_crawler/crawlers/web_archive.py index f58d122..8867cbb 100644 --- a/multi_crawler/crawlers/web_archive.py +++ b/multi_crawler/crawlers/web_archive.py @@ -6,6 +6,7 @@ import internetarchive +from ..models import Audio from .crawlers import BaseCrawler @@ -33,6 +34,7 @@ def _find_url(self, item_id: str) -> None: """ item = internetarchive.get_item(item_id) + # get each audio file and call the callback with the information for file in item.files: if "mp3" in file["format"].lower(): url = f"{self.BASE_URL}{item.identifier}/{file['name']}" @@ -53,15 +55,19 @@ def _find_url(self, item_id: str) -> None: metadata["genre"] = item.metadata["genre"] if len(subject) > 0: - metadata["keywords"] = ", ".join(subject) + metadata["description"] = ", ".join(subject) - self._callback(url, **metadata) + metadata["url"] = url + + audio = Audio(**metadata) + self._callback(url, audio) def crawl(self) -> None: """Search and extract ids""" search = internetarchive.search_items(f"collection:{self._collection}") + # sometimes collect contain another collection if len(search) == 0: self._find_url(self._collection) else: diff --git a/multi_crawler/crawlers/youtube_crawls.py b/multi_crawler/crawlers/youtube_crawls.py index 398c3f7..d3f994f 100644 --- a/multi_crawler/crawlers/youtube_crawls.py +++ b/multi_crawler/crawlers/youtube_crawls.py @@ -1,6 +1,8 @@ +import re import urllib.parse from typing import Any, Callable, Dict, List, Sequence +from ..models import Audio from ..session import Session from .crawlers import BaseCrawler @@ -26,6 +28,29 @@ def __init__( self._callback = callback self._session = session + @staticmethod + def _get_description(content: str) -> str: + """Find and return the description of a Youtube video. + + Args: + content (str): the content of the Youtube video page + + Returns: + str: the description of the video + """ + + description_match = re.search( + r'attributedDescription":\{"content":"((?:\\.|[^"\\])*)"', + content, + re.DOTALL, + ) + + descr = "" + if description_match: + descr = description_match.group(1) + + return descr + @staticmethod def _get_contents(result: Dict[str, Any]) -> List[Dict[str, Any]]: """Find and return the contents of a Youtube search result. @@ -37,6 +62,7 @@ def _get_contents(result: Dict[str, Any]) -> List[Dict[str, Any]]: List[Dict[str, Any]]: the contents of the search result """ + # Check if the result contains the contents if "contents" in result: return result["contents"]["twoColumnSearchResultsRenderer"][ "primaryContents" @@ -88,17 +114,18 @@ def crawl(self, nb_results: int = float("inf")) -> None: response = session().post(self.YT_SEARCH_URL, headers=headers, json=data) response.raise_for_status() - result = response.json() contents = self._get_contents(result) + # Get the contents of the search result for content in contents: if results_found >= nb_results: break if "itemSectionRenderer" in content: items = content["itemSectionRenderer"]["contents"] for item in items: + # Check if there are no more results if "messageRenderer" in item: if ( item["messageRenderer"]["text"]["runs"][0][ @@ -110,13 +137,33 @@ def crawl(self, nb_results: int = float("inf")) -> None: if results_found >= nb_results: break + if "videoRenderer" in item: video_id = item["videoRenderer"].get("videoId") if video_id: + # Get the video information + video_renderer = item["videoRenderer"] video_url = ( f"https://www.youtube.com/watch?v={video_id}" ) - self._callback(video_url) + + title = video_renderer["title"]["runs"][0]["text"] + channel_name = video_renderer["ownerText"]["runs"][0][ + "text" + ] + + video_page = session().get(video_url) + description = self._get_description(video_page.text) + + audio = Audio( + url=video_url, + title=title, + author=channel_name, + description=description, + ) + + # Call the callback function + self._callback(video_url, audio) results_found += 1 elif "continuationItemRenderer" in content: continuation_token = content["continuationItemRenderer"][ diff --git a/multi_crawler/models/__init__.py b/multi_crawler/models/__init__.py new file mode 100644 index 0000000..7fdcb6f --- /dev/null +++ b/multi_crawler/models/__init__.py @@ -0,0 +1 @@ +from .audio import * diff --git a/multi_crawler/models/audio.py b/multi_crawler/models/audio.py new file mode 100644 index 0000000..7dc677f --- /dev/null +++ b/multi_crawler/models/audio.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel + + +class Audio(BaseModel): + """ + Audio model + """ + + url: str + title: str = None + author: str = None + description: str = None + genre: str = None + album: str = None diff --git a/requirements.txt b/requirements.txt index 395dc1d..360282a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ requests[socks] python-dotenv stem -internetarchive \ No newline at end of file +internetarchive +pytube diff --git a/test.py b/test.py index 9074b6a..1a6871b 100644 --- a/test.py +++ b/test.py @@ -1,16 +1,8 @@ from multi_crawler import ArchiveCrawler, CSVExporter, Session, YoutubeCrawler +from multi_crawler.models import Audio -i = 0 - - -def print_url(url: str, **kwargs): - global i - i += 1 - print(url, i, kwargs.get("title")) - - -exporter = CSVExporter("results.csv", "URL", overwrite=True) -crawlers = ArchiveCrawler("ultra-japanese-sound-collection", callback=print_url) -# crawlers = YoutubeCrawler("phonk", callback=exporter, session=Session) +exporter = CSVExporter("results.csv", overwrite=True, *list(Audio.model_fields.keys())) +# crawlers = ArchiveCrawler("ultra-japanese-sound-collection", callback=print_url) +crawlers = YoutubeCrawler("phonk", callback=exporter, session=Session) crawlers.crawl() diff --git a/test2.py b/test2.py new file mode 100644 index 0000000..9d80d40 --- /dev/null +++ b/test2.py @@ -0,0 +1,23 @@ +import re + +import requests + +s = requests.Session() +r = s.get("https://www.youtube.com/watch?v=o1A5hQZyuC4") + + +def _get_description(content): + description_match = re.search( + r'attributedDescription":\{"content":"((?:[^"\\]|\\.)*?)"', + content, + re.DOTALL, + ) + + descr = "" + if description_match: + descr = description_match.group(1) + + return descr + + +print(_get_description(r.text))