From a4a4f99761f4739f1b6f0bbc634605bac7e2ad8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kr=C3=B3nos?= Date: Sat, 29 Jan 2022 13:00:11 +0100 Subject: [PATCH] Update __init__.py Aggiunte le notazioni --- animeworld/__init__.py | 658 +++++++++++++++++++++++++++-------------- 1 file changed, 436 insertions(+), 222 deletions(-) diff --git a/animeworld/__init__.py b/animeworld/__init__.py index 16a214d..d13cf33 100644 --- a/animeworld/__init__.py +++ b/animeworld/__init__.py @@ -6,16 +6,18 @@ import inspect import time import os +from typing import * HDR = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36'} cookies = {} - - ## Function ########################################## -def HealthCheck(fun): # Controlla se la libreria è deprecata +def HealthCheck(fun): + """ + Controlla se la libreria è deprecata + """ def wrapper(*args, **kwargs): try: return fun(*args, **kwargs) @@ -27,7 +29,19 @@ def wrapper(*args, **kwargs): return wrapper @HealthCheck -def find(keyword): +def find(keyword: str) -> Optional[Dict]: + """ + Ricerca un anime tramite le API interne di Animeworld. + + - `keyword`: Il nome dell'anime o una porzione di esso. + + ```python + return { + name: str, # Nome dell'anime trovato + link: str # Link dell'anime trovato + } + ``` + """ res = requests.get("https://www.animeworld.tv", headers = HDR) cookies.update(res.cookies.get_dict()) soupeddata = BeautifulSoup(res.content, "html.parser") @@ -54,232 +68,68 @@ def find(keyword): ######################################################## -## Class ############################################### - -class Anime: - # mapped = { - # 2:"DoodStream", - # 3:"VVVVID", - # 4:"YouTube", - # 8:"Streamtape", - # 9:"AnimeWorld Server", - # 10:"Beta Server", - # 11:"OkStream", - # 15:"NinjaStream", - # 17:"Userload", - # 18:"VUP" - # } - - def __init__(self, link): - self.link = link - self.__fixCookie() - self.html = self.__getHTML().content - self.__check404() - # self.server = self.__getServer() - # self.nome = self.getName() - # self.trama = self.getTrama() - # self.info = self.getInfo() - - # Private - def __fixCookie(self): - try: - soupeddata = BeautifulSoup(self.__getHTML().content, "html.parser") - - cookies['AWCookieVerify'] = re.search(r'document\.cookie="AWCookieVerify=(.+) ;', soupeddata.prettify()).group(1) - - except AttributeError: - pass - - # Private - def __getHTML(self): - r = None - while True: - try: - r = requests.get(self.link, headers = HDR, cookies=cookies, timeout=(3, 27), allow_redirects=True) - - cookies.update(r.cookies.get_dict()) - - if len(list(filter(re.compile(r'30[^2]').search, [str(x.status_code) for x in r.history]))): # se c'è un redirect strano - continue - - except requests.exceptions.ReadTimeout: - time.sleep(1) # errore - - else: - break - r.raise_for_status() - return r - - # Private - def __check404(self): - if self.html.decode("utf-8").find('Errore 404') != -1: raise Error404(self.link) - - # Private - @HealthCheck - def __getServer(self): # Provider dove sono hostati gli episodi - soupeddata = BeautifulSoup(self.html, "html.parser") - block = soupeddata.find("span", { "class" : "servers-tabs" }) - - if block == None: raise AnimeNotAvailable(self.getName()) - - providers = block.find_all("span", { "class" : "server-tab" }) - data = {int(x["data-name"]):{"name": x.get_text()} for x in providers} - return data - - @HealthCheck - def getTrama(self): # Trama dell'anime - soupeddata = BeautifulSoup(self.html, "html.parser") - return soupeddata.find("div", { "class" : "desc" }).get_text() - - @HealthCheck - def getInfo(self): # Informazioni dell'anime - soupeddata = BeautifulSoup(self.html, "html.parser") - block = soupeddata.find("div", { "class" : "info" }).find("div", { "class" : "row" }) - - tName = [x.get_text().replace(':', '') for x in block.find_all("dt")] - tInfo = [] - for x in block.find_all("dd"): - txt = x.get_text() - if len(txt.split(',')) > 1: - tInfo.append([x.strip() for x in txt.split(',')]) - else: - tInfo.append(txt.strip()) - - return dict(zip(tName, tInfo)) - - @HealthCheck - def getName(self): # Nome dell'anime - soupeddata = BeautifulSoup(self.html, "html.parser") - return soupeddata.find("h1", { "id" : "anime-title" }).get_text() - - ############# - - @HealthCheck - def getEpisodes(self): # Ritorna una lista di Episodi - soupeddata = BeautifulSoup(self.html, "html.parser") - - self.link = "https://www.animeworld.tv" + soupeddata.select_one('li.episode > a').get('href') - - soupeddata = BeautifulSoup(self.__getHTML().content, "html.parser") - - HDR.update({"csrf-token": soupeddata.find('meta', {'id': 'csrf-token'}).get('content')}) - - raw = {} # dati in formato semi-grezzo - eps = [] # Lista di Episodio() - - - provLegacy = self.__getServer() # vecchio sistema di cattura server - - raw = {} - for liElem in soupeddata.find_all("li", {"class": "episode"}): - aElem = liElem.find('a') - raw[aElem.get('data-episode-num')] = { - "episodeId": aElem.get('data-episode-id') - } - - for provID in provLegacy: - provLegacy[provID]["soup"] = soupeddata.find("div", {"class": "server", "data-name": str(provID)}) - - for ep in raw: - epNum =ep - epID = raw[ep]["episodeId"] - - legacy_links = [] - - - for provID in provLegacy: - legacy_links.append({ - "id": int(provID), - "name": provLegacy[provID]["name"], - "link": "https://www.animeworld.tv" + provLegacy[provID]["soup"].find('a', {'data-episode-num': ep}).get("href") - }) - - - ep = Episodio(epNum, f"https://www.animeworld.tv/api/download/{epID}", legacy_links) - eps.append(ep) - - return eps - -class Episodio: - def __init__(self, number, link, legacy): - self.number = number # Numero dell'episodio - self.link = link # link post API - self.legacy = legacy # vecchio sistema - - @property - def links(self): # lista dei provider dove sono hostati gli ep - tmp = [] # tutti i links - - res = requests.post(self.link, headers = HDR, cookies=cookies, timeout=(3, 27)) - data = res.json() - - for provID in data["links"]: - key = [x for x in data["links"][provID].keys() if x != 'server'][0] - tmp.append({ - "id": int(provID), - "name": data["links"][provID]["server"]["name"], - "link": data["links"][provID][key]["link"] - }) - - for prov in self.legacy: - if str(prov['id']) in data["links"].keys(): continue - - tmp.append(prov) +### Server ### - return self.__setServer(tmp, self.number) +class Server: + def __init__(self, link: str, Nid: int, name: str, number: str): + """ + Costruisce la classe Astratta Server. - def download(self, title=None, folder=''): # Scarica l'episodio con il primo link nella lista - - return self.links[0].download(title,folder) + - `link`: Link del server in cui è hostato l'episodio. + - `Nid`: ID del server. + - `name`: Nome del server. + - `number`: Numero dell'episodio + """ - # Private - def __setServer(self, links, numero): # Per ogni link li posizioni nelle rispettive classi - ret = [] # lista dei server - for prov in links: - if prov["id"] == 3: - ret.append(VVVVID(prov["link"], prov["id"], prov["name"], numero)) - elif prov["id"] == 4: - ret.append(YouTube(prov["link"], prov["id"], prov["name"], numero)) - elif prov["id"] == 9: - ret.append(AnimeWorld_Server(prov["link"], prov["id"], prov["name"], numero)) - elif prov["id"] == 8: - ret.append(Streamtape(prov["link"], prov["id"], prov["name"], numero)) - else: - ret.append(Server(prov["link"], prov["id"], prov["name"], numero)) - ret.sort(key=self.__sortServer) - return ret + self.link = link + """Link del server in cui è hostato l'episodio.""" + self.Nid = Nid + """ID del server.""" + self.name = name + """Nome del server.""" + self.number = number + """Numero dell'episodio""" - # Private - def __sortServer(self, elem): # Ordina i server per importanza - if isinstance(elem, VVVVID): return 0 - elif isinstance(elem, YouTube): return 1 - elif isinstance(elem, AnimeWorld_Server): return 2 - elif isinstance(elem, Streamtape): return 3 - else: return 4 + self._HDR = HDR # Protected + self._defTitle = f"{self.number} - {self.name}" # nome del file provvisorio + def sanitize(self, title: str) -> str: # Toglie i caratteri illegali per i file + """ + Rimuove i caratteri illegali per il nome del file. -### Server ### + - `title`: Nome del file. -class Server: - def __init__(self, link, Nid, name, numero): - self.link = link # Link del server dove è hostato l'episodio (str) - self.Nid = Nid # Id del server (int) - self.name = name # Nome del server - self.number = numero # Numero dell'episodio - self._HDR = HDR # Protected - self._defTitle = f"{self.number} - {self.name}" - - def sanitize(self, title): # Toglie i caratteri illegali per i file + ``` + return str # title sanitizzato + ``` + """ illegal = ['#','%','&','{','}', '\\','<','>','*','?','/','$','!',"'",'"',':','@','+','`','|','='] for x in illegal: title = title.replace(x, '') return title - def download(self, title=None, folder=''): + def download(self, title: Optional[str]=None, folder: str='') -> NoReturn: + """ + Scarica l'episodio. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + """ raise ServerNotSupported(self.name) # Protected - def _downloadIn(self, title, folder): # Scarica l'episodio + def _downloadIn(self, title: str, folder: str) -> bool: # Scarica l'episodio + """ + Scarica il file utilizzando requests. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ with requests.get(self._getFileLink(), headers = self._HDR, stream = True) as r: r.raise_for_status() with open(f"{os.path.join(folder,title)}.mp4", 'wb') as f: @@ -291,7 +141,17 @@ def _downloadIn(self, title, folder): # Scarica l'episodio return False # Se è accaduto qualche imprevisto # Protected - def _dowloadEx(self, title, folder): # Scarica l'episodio con l'utilizzo della libreria yutube_dl + def _dowloadEx(self, title: str, folder: str): + """ + Scarica il file utilizzando yutube_dl. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ class MyLogger(object): def debug(self, msg): pass @@ -321,7 +181,17 @@ def _getFileLink(self): return self.link.replace('download-file.php?id=', '') - def download(self, title=None, folder=''): + def download(self, title: Optional[str]=None, folder: str='') -> bool: + """ + Scarica l'episodio. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ if title is None: title = self._defTitle else: title = self.sanitize(title) return self._downloadIn(title,folder) @@ -344,7 +214,17 @@ def _getFileLink(self): raw = soupeddata.find("a", { "class" : "VVVVID-link" }) return raw.get("href") - def download(self, title=None, folder=''): + def download(self, title: Optional[str]=None, folder: str='') -> bool: + """ + Scarica l'episodio. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ if title is None: title = self._defTitle else: title = self.sanitize(title) return self._dowloadEx(title,folder) @@ -370,7 +250,17 @@ def _getFileLink(self): return yutubelink_raw.replace('embed/', 'watch?v=') - def download(self, title=None, folder=''): + def download(self, title: Optional[str]=None, folder: str='') -> bool: + """ + Scarica l'episodio. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ if title is None: title = self._defTitle else: title = self.sanitize(title) return self._dowloadEx(title,folder) @@ -399,34 +289,358 @@ def _getFileLink(self): return mp4_link - def download(self, title=None, folder=''): + def download(self, title: Optional[str]=None, folder: str='') -> bool: + """ + Scarica l'episodio. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ if title is None: title = self._defTitle else: title = self.sanitize(title) return self._downloadIn(title,folder) +## Class ############################################### + +class Episodio: + def __init__(self, number: str, link: str, legacy: List[Dict] = []): + """ + Costruisce la classe Episodio. + + - `number`: Numero dell'episodio. + - `link`: Link dell'endpoint dell'episodio. + - `legacy`: Lista di tutti i link dei server in cui sono hostati gli episodi. + """ + self.number = number + """Numero dell'episodio.""" + self.link = link + """Link dell'endpoint dell'episodio.""" + self.legacy = legacy + """Lista di tutti i link dei server in cui sono hostati gli episodi.""" + + @property + def links(self) -> List[Server]: # lista dei provider dove sono hostati gli ep + """ + Ottiene la lista dei server in cui è hostato l'episodio. + + ```python + return [ + Server, # Classe Server + ... + ] + ``` + """ + tmp = [] # tutti i links + + res = requests.post(self.link, headers = HDR, cookies=cookies, timeout=(3, 27)) + data = res.json() + + for provID in data["links"]: + key = [x for x in data["links"][provID].keys() if x != 'server'][0] + tmp.append({ + "id": int(provID), + "name": data["links"][provID]["server"]["name"], + "link": data["links"][provID][key]["link"] + }) + + for prov in self.legacy: + if str(prov['id']) in data["links"].keys(): continue + + tmp.append(prov) + + return self.__setServer(tmp, self.number) + + def download(self, title: Optional[str]=None, folder: str='') -> Union[bool, NoReturn]: # Scarica l'episodio con il primo link nella lista + """ + Scarica l'episodio dal primo server della lista links. + + - `title`: Nome con cui verrà nominato il file scaricato. + - `folder`: Posizione in cui verrà spostato il file scaricato. + + ``` + return bool # File scaricato + ``` + """ + + return self.links[0].download(title,folder) + + # Private + def __setServer(self, links: List[Dict], numero: str) -> List[Server]: # Per ogni link li posizioni nelle rispettive classi + """ + Costruisce la rispettiva classe Server per ogni link passato. + + - `links`: Dizionario ('id', 'name', 'link') contenente le informazioni del Server in cui è hostato l'episodio. + - `numero`: Numero dell'episodio. + + ```python + return [ + Server, # Classe Server + ... + ] + ``` + """ + ret = [] # lista dei server + for prov in links: + if prov["id"] == 3: + ret.append(VVVVID(prov["link"], prov["id"], prov["name"], numero)) + elif prov["id"] == 4: + ret.append(YouTube(prov["link"], prov["id"], prov["name"], numero)) + elif prov["id"] == 9: + ret.append(AnimeWorld_Server(prov["link"], prov["id"], prov["name"], numero)) + elif prov["id"] == 8: + ret.append(Streamtape(prov["link"], prov["id"], prov["name"], numero)) + else: + ret.append(Server(prov["link"], prov["id"], prov["name"], numero)) + ret.sort(key=self.__sortServer) + return ret + + # Private + def __sortServer(self, elem): + """ + Ordina i server per importanza. + """ + if isinstance(elem, VVVVID): return 0 + elif isinstance(elem, YouTube): return 1 + elif isinstance(elem, AnimeWorld_Server): return 2 + elif isinstance(elem, Streamtape): return 3 + else: return 4 + ######################################################## + +class Anime: + def __init__(self, link: str): + """ + Costruisce la classe Anime. + + - `link`: Il link dell'anime. + """ + self.link = link + self.__fixCookie() + self.html = self.__getHTML().content + self.__check404() + # self.server = self.__getServer() + # self.nome = self.getName() + # self.trama = self.getTrama() + # self.info = self.getInfo() + + # Private + def __fixCookie(self): + """ + Aggiorna il cookie `AWCookieVerify`. + """ + try: + soupeddata = BeautifulSoup(self.__getHTML().content, "html.parser") + + cookies['AWCookieVerify'] = re.search(r'document\.cookie="AWCookieVerify=(.+) ;', soupeddata.prettify()).group(1) + + except AttributeError: + pass + + # Private + def __getHTML(self) -> requests.Response: + """ + Ottiene la pagina web di Animeworld dell'anime e aggiorna i cookies. + """ + r = None + while True: + try: + r = requests.get(self.link, headers = HDR, cookies=cookies, timeout=(3, 27), allow_redirects=True) + + cookies.update(r.cookies.get_dict()) + + if len(list(filter(re.compile(r'30[^2]').search, [str(x.status_code) for x in r.history]))): # se c'è un redirect strano + continue + + except requests.exceptions.ReadTimeout: + time.sleep(1) # errore + + else: + break + r.raise_for_status() + return r + + # Private + def __check404(self): + """ + Controlla se la pagina è una pagina 404. + """ + if self.html.decode("utf-8").find('Errore 404') != -1: raise Error404(self.link) + + # Private + @HealthCheck + def __getServer(self) -> Dict[int, Dict[str, str]]: + """ + Ottiene tutti i server in cui sono hostati gli episodi. + + ```python + return { + int: { # ID del server + name: str # nome del server + }, + ... + } + ``` + """ + soupeddata = BeautifulSoup(self.html, "html.parser") + block = soupeddata.find("span", { "class" : "servers-tabs" }) + + if block == None: raise AnimeNotAvailable(self.getName()) + + providers = block.find_all("span", { "class" : "server-tab" }) + return { + int(x["data-name"]): { + "name": x.get_text() + } + for x in providers + } + + @HealthCheck + def getTrama(self) -> str: + """ + Ottiene la trama dell'anime. + + ```python + return str # trama dell'anime + ``` + """ + soupeddata = BeautifulSoup(self.html, "html.parser") + return soupeddata.find("div", { "class" : "desc" }).get_text() + + @HealthCheck + def getInfo(self) -> Dict[str, str]: # Informazioni dell'anime + """ + Ottiene le informazioni dell'anime. + + ```python + return { + 'Categoria': str, + 'Audio': str, + 'Data di Uscita': str, + 'Stagione': str, + 'Studio': str, + 'Genere': List[str], + 'Voto': str, + 'Durata': str, + 'Episodi': str, + 'Stato': str, + 'Visualizzazioni': str + } + ``` + """ + soupeddata = BeautifulSoup(self.html, "html.parser") + block = soupeddata.find("div", { "class" : "info" }).find("div", { "class" : "row" }) + + tName = [x.get_text().replace(':', '') for x in block.find_all("dt")] + tInfo = [] + for x in block.find_all("dd"): + txt = x.get_text() + if len(txt.split(',')) > 1: + tInfo.append([x.strip() for x in txt.split(',')]) + else: + tInfo.append(txt.strip()) + + return dict(zip(tName, tInfo)) + + @HealthCheck + def getName(self) -> str: # Nome dell'anime + """ + Ottiene il nome dell'anime. + + ```python + return str # nome dell'anime + ``` + """ + soupeddata = BeautifulSoup(self.html, "html.parser") + return soupeddata.find("h1", { "id" : "anime-title" }).get_text() + + ############# + + @HealthCheck + def getEpisodes(self) -> List[Episodio]: # Ritorna una lista di Episodi + """ + Ottiene tutti gli episodi dell'anime. + + ```python + return [ + Episodio, # Classe Episodio + ... + ] + ``` + """ + soupeddata = BeautifulSoup(self.html, "html.parser") + + self.link = "https://www.animeworld.tv" + soupeddata.select_one('li.episode > a').get('href') + + soupeddata = BeautifulSoup(self.__getHTML().content, "html.parser") + + HDR.update({"csrf-token": soupeddata.find('meta', {'id': 'csrf-token'}).get('content')}) + + raw = {} # dati in formato semi-grezzo + eps = [] # Lista di Episodio() + + + provLegacy = self.__getServer() # vecchio sistema di cattura server + + raw = {} + for liElem in soupeddata.find_all("li", {"class": "episode"}): + aElem = liElem.find('a') + raw[aElem.get('data-episode-num')] = { + "episodeId": aElem.get('data-episode-id') + } + + for provID in provLegacy: + provLegacy[provID]["soup"] = soupeddata.find("div", {"class": "server", "data-name": str(provID)}) + + for ep in raw: + epNum =ep + epID = raw[ep]["episodeId"] + + legacy_links = [] + + + for provID in provLegacy: + legacy_links.append({ + "id": int(provID), + "name": provLegacy[provID]["name"], + "link": "https://www.animeworld.tv" + provLegacy[provID]["soup"].find('a', {'data-episode-num': ep}).get("href") + }) + + + ep = Episodio(epNum, f"https://www.animeworld.tv/api/download/{epID}", legacy_links) + eps.append(ep) + + return eps + ## ERRORS ############################################# -class ServerNotSupported(Exception): # Il server da dove si tenta di scaricare l'episodio non è supportato +class ServerNotSupported(Exception): + """Il server da dove si tenta di scaricare l'episodio non è supportato.""" def __init__(self, server): self.server = server self.message = f"Il server {server} non è supportato." super().__init__(self.message) -class AnimeNotAvailable(Exception): # L'anime non è ancora disponibile +class AnimeNotAvailable(Exception): + """L'anime non è ancora disponibile.""" def __init__(self, animeName=''): self.anime = animeName self.message = f"L'anime '{animeName}' non è acora disponibile." super().__init__(self.message) class Error404(Exception): + """Il link porta ad una pagina inesistente.""" def __init__(self, link): self.link = link self.message = f"Il link '{link}' porta ad una pagina inesistente." super().__init__(self.message) class DeprecatedLibrary(Exception): + """Libreria deprecata a causa di un cambiamento della struttura del sito.""" def __init__(self, funName, line): self.funName = funName self.line = line