From 1a1f32d62716daa05b39789208d0582c73c3c92d Mon Sep 17 00:00:00 2001 From: Santiago Ramirez Date: Mon, 22 Jul 2024 12:52:14 -0400 Subject: [PATCH] optimized code --- example/main.py | 143 ++++++++++--------------------------- quotexpy/__init__.py | 33 +++++---- quotexpy/api.py | 98 ++++++++++++------------- quotexpy/http/login.py | 4 +- quotexpy/http/qxbroker.py | 21 ++---- quotexpy/utils/__init__.py | 9 +++ quotexpy/ws/client.py | 4 +- 7 files changed, 121 insertions(+), 191 deletions(-) diff --git a/example/main.py b/example/main.py index 5cbb99c..81d145c 100644 --- a/example/main.py +++ b/example/main.py @@ -3,121 +3,62 @@ import random import asyncio import datetime -from pathlib import Path from termcolor import colored from quotexpy import Quotex +from quotexpy.utils import asset_parse, asrun from quotexpy.utils.account_type import AccountType +from quotexpy.utils.candles_period import CandlesPeriod from quotexpy.utils.operation_type import OperationType -from quotexpy.utils import asset_parse, sessions_file_path -asset_current = "EURUSD" +asset_current = "EURGBP" -def pin_code_handler() -> str: + +def on_pin_code() -> str: code = input("Enter the code sent to your email: ") return code -class SingletonDecorator: - """ - A decorator that turns a class into a singleton. - """ - - def __init__(self, cls): - self.cls = cls - self.instance = None - - def __call__(self, *args, **kwargs): - if self.instance is None: - self.instance = self.cls(*args, **kwargs) - return self.instance - - -@SingletonDecorator -class MyConnection: - """ - This class represents a connection object and provides methods for connecting to a client. - """ - - def __init__(self, client_instance: Quotex): - self.client = client_instance - - async def connect(self, attempts=5): - check = await self.client.connect() - if not check: - attempt = 0 - while attempt <= attempts: - if not self.client.check_connect(): - check = await self.client.connect() - if check: - print("Reconectado com sucesso!!!") - break - print("Erro ao reconectar.") - attempt += 1 - if Path(sessions_file_path).is_file(): - Path(sessions_file_path).unlink() - print(f"Tentando reconectar, tentativa {attempt} de {attempts}") - elif not check: - attempt += 1 - else: - break - await asyncio.sleep(5) - return check - return check - - def close(self): - """ - Closes the client connection. - """ - self.client.close() - - -def run(y): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - z = loop.run_until_complete(y) - return z - - -client = Quotex(email="", password="", on_pin_code=pin_code_handler) -client.debug_ws_enable = False +client = Quotex( + email="", + password="", + headless=True, + on_pin_code=on_pin_code, +) def check_asset(asset): asset_query = asset_parse(asset) - asset_open = client.check_asset_open(asset_query) + asset_open = client.check_asset(asset_query) if not asset_open or not asset_open[2]: print(colored("[WARN]: ", "yellow"), "Asset is closed.") asset = f"{asset}_otc" print(colored("[WARN]: ", "yellow"), "Try OTC Asset -> " + asset) asset_query = asset_parse(asset) - asset_open = client.check_asset_open(asset_query) + asset_open = client.check_asset(asset_query) return asset, asset_open async def get_balance(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: client.change_account(AccountType.PRACTICE) # "REAL" print(colored("[INFO]: ", "blue"), "Balance: ", client.get_balance()) print(colored("[INFO]: ", "blue"), "Exiting...") - prepare_connection.close() + client.close() async def balance_refill(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: result = await client.edit_practice_balance(100) print(result) - prepare_connection.close() + client.close() async def trade(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: client.change_account(AccountType.PRACTICE) amount = 1 @@ -134,7 +75,7 @@ async def trade(): print(colored("[WARN]: ", "yellow"), "Asset is closed.") print(colored("[INFO]: ", "blue"), "Balance: ", await client.get_balance()) print(colored("[INFO]: ", "blue"), "Exiting...") - prepare_connection.close() + client.close() async def wait_for_input_exceeding_x_seconds_limit(secounds=30): @@ -146,8 +87,7 @@ async def wait_for_input_exceeding_x_seconds_limit(secounds=30): async def trade_and_check_win(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: client.change_account(AccountType.PRACTICE) print(colored("[INFO]: ", "blue"), "Balance: ", await client.get_balance()) @@ -174,8 +114,7 @@ async def trade_and_check_win(): async def sell_option(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: client.change_account(AccountType.PRACTICE) amount = 30 @@ -187,50 +126,43 @@ async def sell_option(): await client.sell_option(buy_info["id"]) print(colored("[INFO]: ", "blue"), "Balance: ", await client.get_balance()) print(colored("[INFO]: ", "blue"), "Exiting...") - prepare_connection.close() + client.close() async def assets_open(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: for i in client.get_all_asset_name(): - print(i, client.check_asset_open(i)) - prepare_connection.close() + print(i, client.check_asset(i)) + client.close() async def get_payment(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: all_data = client.get_payment() for asset_name in all_data: asset_data = all_data[asset_name] print(asset_name, asset_data["payment"], asset_data["open"]) - prepare_connection.close() + client.close() -# import numpy as np async def get_candle_v2(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() - period = 100 + check_connect = await client.connect() if check_connect: global asset_current asset, asset_open = check_asset(asset_current) if asset_open[2]: print(colored("[INFO]: ", "blue"), "Asset is open.") - # 60 at 180 seconds - candles = await client.get_candle_v2(asset, period) + candles = await client.get_candle_v2(asset, CandlesPeriod.ONE_MINUTE) print(candles) else: print(colored("[INFO]: ", "blue"), "Asset is closed.") - prepare_connection.close() + client.close() async def get_realtime_candle(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: list_size = 10 global asset_current @@ -240,17 +172,16 @@ async def get_realtime_candle(): if len(client.get_realtime_candles(asset)) == list_size: break print(client.get_realtime_candles(asset)) - prepare_connection.close() + client.close() async def get_signal_data(): - prepare_connection = MyConnection(client) - check_connect = await prepare_connection.connect() + check_connect = await client.connect() if check_connect: while True: print(client.get_signal_data()) time.sleep(1) - prepare_connection.close() + client.close() async def main(): @@ -258,16 +189,16 @@ async def main(): # await get_signal_data() # await get_payment() # await get_payments_payout_more_than() - # await get_candle_v2() + await get_candle_v2() # await get_realtime_candle() # await assets_open() - await trade_and_check_win() + # await trade_and_check_win() # await balance_refill() if __name__ == "__main__": try: - run(main()) + asrun(main()) except KeyboardInterrupt: print("Aborted!") sys.exit(0) diff --git a/quotexpy/__init__.py b/quotexpy/__init__.py index b03f7bf..24bd1df 100644 --- a/quotexpy/__init__.py +++ b/quotexpy/__init__.py @@ -39,7 +39,7 @@ def __init__(self, email: str, password: str, **kwargs): self.subscribe_mood = [] self.websocket_client = None self.websocket_thread = None - self.debug_ws_enable = False + self.trace_ws = False self.logger = logging.getLogger(__name__) @@ -51,8 +51,19 @@ def websocket(self): """ return self.websocket_client.wss + async def connect(self) -> bool: + self.api = QuotexAPI(self.email, self.password, **self.kwargs) + self.api.trace_ws = self.trace_ws + ok = await self.api.connect() + if ok: + self.api.send_ssid(max_attemps=10) + if self.api.check_accepted_connection == 0: + ok = await self.connect() + + return ok + def check_connect(self): - if self.api.check_websocket_if_connect == 1: + if isinstance(self.api, QuotexAPI) and self.api.check_websocket_if_connect == 1: return True return False @@ -84,7 +95,6 @@ async def get_instruments(self): while self.api.instruments is None and time.time() - start < 10: await asyncio.sleep(0.1) except: - self.logger.error("api.get_instruments need reconnect") await self.connect() return self.api.instruments @@ -92,7 +102,7 @@ def get_all_asset_name(self): if self.api.instruments: return [instrument[2].replace("\n", "") for instrument in self.api.instruments] - def check_asset_open(self, asset: str) -> typing.Union[typing.Tuple[int, str, bool], None]: + def check_asset(self, asset: str) -> typing.Union[typing.Tuple[int, str, bool], None]: if isinstance(self.api, QuotexAPI) and self.api.instruments: for i in self.api.instruments: if asset == i[2]: @@ -113,7 +123,6 @@ async def get_candles(self, asset: str, offset: int, period: int) -> typing.List if self.api.candles.candles_data is not None: break except: - self.logger.error("get_candles need reconnect") await self.connect() return self.api.candles.candles_data @@ -127,17 +136,6 @@ async def get_candle_v2(self, asset: str, period: int) -> typing.List[typing.Uni return self.api.candle_v2_data[asset] - async def connect(self) -> bool: - self.api = QuotexAPI(self.email, self.password, **self.kwargs) - self.api.trace_ws = self.debug_ws_enable - ok = await self.api.connect() - if ok: - self.api.send_ssid(max_attemps=10) - if self.api.check_accepted_connection == 0: - ok = await self.connect() - - return ok - def change_account(self, mode=AccountType.PRACTICE) -> None: """Change active account `real` or `practice`""" if mode.upper() == AccountType.REAL: @@ -321,7 +319,8 @@ async def start_mood_stream(self, asset, instrument="turbo-option"): await asyncio.sleep(5) def close(self): - self.api.close() + if isinstance(self.api, QuotexAPI): + self.api.close() logging.basicConfig( diff --git a/quotexpy/api.py b/quotexpy/api.py index 99c3568..d67cfc2 100644 --- a/quotexpy/api.py +++ b/quotexpy/api.py @@ -76,8 +76,8 @@ class QuotexAPI(object): def __init__(self, email: str, password: str, **kwargs): """ - :param str email: The email of a Quotex server. - :param str password: The password of a Quotex server. + :param str email: The email of a Quotex account. + :param str password: The password of a Quotex account. """ self.email = email self.password = password @@ -109,28 +109,6 @@ def websocket(self): """ return self.websocket_client.wss - def get_candle_v2(self): - payload = {"_placeholder": True, "num": 0} - data = f'451-["history/list/v2", {json.dumps(payload)}]' - return self.send_websocket_request(data) - - def subscribe_realtime_candle(self, asset: str, period: int): - self.realtime_price[asset] = [] - payload = {"asset": asset, "period": period} - data = f'42["instruments/update", {json.dumps(payload)}]' - self.send_websocket_request(data) - payload = {"asset": asset, "period": period} - data = f'42["depth/follow", "{asset}"]' - self.send_websocket_request(data) - payload = {"asset": asset, "version": "1.0.0"} - data = f'42["chart_notification/get", {json.dumps(payload)}]' - self.send_websocket_request(data) - return self.send_websocket_request('42["tick"]') - - def unsubscribe_realtime_candle(self, asset): - data = f'42["subfor", {json.dumps(asset)}]' - return self.send_websocket_request(data) - @property def logout(self): """Property for get Quotex http login resource. @@ -176,17 +154,61 @@ def get_candles(self): """ return GetCandles(self) + async def connect(self) -> bool: + """Method for connection to Quotex API""" + self.ssl_Mutual_exclusion = False + self.ssl_Mutual_exclusion_write = False + if self.check_websocket_if_connect: + self.close() + ssid, self.cookies = await self.get_ssid() + check_websocket = self.start_websocket() + if not check_websocket: + return check_websocket + if not self.SSID: + self.SSID = ssid + + return check_websocket + def check_session(self) -> typing.Tuple[str, str]: data = {} if os.path.isfile(sessions_file_path): with open(sessions_file_path, "rb") as file: - data = pickle.load(file) + data: dict = pickle.load(file) - sessions = data.get(self.email, []) + sessions: list[dict] = data.get(self.email, []) for session in sessions: return session.get("ssid", ""), session.get("cookies", "") return "", "" + async def get_ssid(self) -> typing.Tuple[str, str]: + ssid, cookies = self.check_session() + if not ssid: + self.logger.info("authenticating user") + ssid, cookies = self.login(self.email, self.password, **self.kwargs) + return ssid, cookies + + def get_candle_v2(self) -> None: + payload = {"_placeholder": True, "num": 0} + data = f'451-["history/list/v2", {json.dumps(payload)}]' + self.send_websocket_request(data) + + def subscribe_realtime_candle(self, asset: str, period: int) -> None: + self.realtime_price[asset] = [] + payload = {"asset": asset, "period": period} + data = f'42["instruments/update", {json.dumps(payload)}]' + self.send_websocket_request(data) + payload = {"asset": asset, "period": period} + data = f'42["depth/follow", "{asset}"]' + self.send_websocket_request(data) + payload = {"asset": asset, "version": "1.0.0"} + data = f'42["chart_notification/get", {json.dumps(payload)}]' + self.send_websocket_request(data) + self.send_websocket_request('42["tick"]') + + def unsubscribe_realtime_candle(self, asset) -> None: + data = f'42["subfor", {json.dumps(asset)}]' + self.send_websocket_request(data) + def send_websocket_request(self, data, no_force_send=True) -> None: """Send websocket request to Quotex server. :param str data: The websocket request data. @@ -215,14 +237,6 @@ def edit_training_balance(self, amount) -> None: data = f'42["demo/refill",{json.dumps(amount)}]' self.send_websocket_request(data) - async def get_ssid(self) -> typing.Tuple[str, str]: - self.logger.info("authenticating user") - ssid, cookies = self.check_session() - if not ssid: - ssid, cookies = await self.login(self.email, self.password, **self.kwargs) - self.logger.info("login successful") - return ssid, cookies - def start_websocket(self) -> bool: self.check_websocket_if_connect = None self.check_websocket_if_error = False @@ -268,7 +282,7 @@ def send_ssid(self, max_attemps=10) -> bool: """ self.profile.msg = None if not self.SSID: - if os.path.exists(os.path.join(sessions_file_path)): + if os.path.exists(sessions_file_path): os.remove(sessions_file_path) return False @@ -285,20 +299,6 @@ def send_ssid(self, max_attemps=10) -> bool: raise QuotexTimeout(f"sending authorization with SSID {self.SSID} took too long to respond") return True - async def connect(self) -> bool: - """Method for connection to Quotex API""" - self.ssl_Mutual_exclusion = False - self.ssl_Mutual_exclusion_write = False - if self.check_websocket_if_connect: - self.close() - ssid, self.cookies = await self.get_ssid() - check_websocket = self.start_websocket() - if not check_websocket: - return check_websocket - if not self.SSID: - self.SSID = ssid - return check_websocket - def close(self) -> None: if self.websocket_client: self.websocket.close() diff --git a/quotexpy/http/login.py b/quotexpy/http/login.py index 2463160..9d97cd4 100644 --- a/quotexpy/http/login.py +++ b/quotexpy/http/login.py @@ -10,7 +10,7 @@ class Login(Browser): base_url = "qxbroker.com" https_base_url = f"https://{base_url}" - async def __call__(self, email: str, password: str, **kwargs): + def __call__(self, email: str, password: str, **kwargs): """ Method to get Quotex API login http request. :param str username: The username of a Quotex server. @@ -22,5 +22,5 @@ async def __call__(self, email: str, password: str, **kwargs): self.headless = kwargs.get("headless", True) self.on_pin_code = kwargs.get("on_pin_code", None) - self.ssid, self.cookies = self.get_cookies_and_ssid() + self.ssid, self.cookies = self.get_ssid_and_cookies() return self.ssid, self.cookies diff --git a/quotexpy/http/qxbroker.py b/quotexpy/http/qxbroker.py index 89e3afb..cb55229 100644 --- a/quotexpy/http/qxbroker.py +++ b/quotexpy/http/qxbroker.py @@ -1,6 +1,4 @@ import os -import re -import json import time import pickle import typing @@ -8,7 +6,6 @@ import random import requests from pathlib import Path -from bs4 import BeautifulSoup import undetected_chromedriver as uc from quotexpy.http.user_agents import agents from quotexpy.utils import sessions_file_path @@ -33,7 +30,7 @@ def __init__(self, api): self.api.user_agent if self.api.user_agent else user_agent_list[random.randint(0, len(user_agent_list) - 1)] ) - def get_cookies_and_ssid(self) -> typing.Tuple[str, str]: + def get_ssid_and_cookies(self) -> typing.Tuple[str, str]: try: options = uc.ChromeOptions() options.add_argument(f"--user-agent={self.user_agent}") @@ -76,22 +73,16 @@ def get_cookies_and_ssid(self) -> typing.Tuple[str, str]: if bc: raise QuotexAuthError("the pin code is incorrect") + wsd: typing.Union[dict, None] = self.browser.execute_script("return window.settings;") + if wsd is None or "token" not in wsd: + raise QuotexAuthError("incorrect username or password") + cookies = self.browser.get_cookies() self.api.cookies = cookies - soup = BeautifulSoup(self.browser.page_source, "html.parser") user_agent = self.browser.execute_script("return navigator.userAgent;") self.api.user_agent = user_agent - try: - script: str = soup.find_all("script", {"type": "text/javascript"})[1].get_text() - except Exception as exc: - raise QuotexAuthError("incorrect username or password") from exc - - match = re.sub("window.settings = ", "", script.strip().replace(";", "")) - - dx: dict = json.loads(match) - ssid = dx.get("token") - + ssid = wsd.get("token") cookiejar = requests.utils.cookiejar_from_dict({c["name"]: c["value"] for c in cookies}) cookie_string = "; ".join([f"{c.name}={c.value}" for c in cookiejar]) output_file = Path(sessions_file_path) diff --git a/quotexpy/utils/__init__.py b/quotexpy/utils/__init__.py index 6eee182..f03fd5a 100644 --- a/quotexpy/utils/__init__.py +++ b/quotexpy/utils/__init__.py @@ -1,6 +1,8 @@ import os import json import time +import typing +import asyncio def asset_parse(asset): @@ -24,6 +26,13 @@ def is_valid_json(mj): return True +def asrun(x: typing.Coroutine): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + r = loop.run_until_complete(x) + return r + + home_dir = os.path.expanduser("~") log_file_path = os.path.join(home_dir, ".quotexpy.log") sessions_file_path = os.path.join(home_dir, ".sessions.pkl") diff --git a/quotexpy/ws/client.py b/quotexpy/ws/client.py index ae7a246..f60288c 100644 --- a/quotexpy/ws/client.py +++ b/quotexpy/ws/client.py @@ -9,7 +9,7 @@ import websocket from quotexpy.http.user_agents import agents -from quotexpy.utils import is_valid_json, sessions_file_path +from quotexpy.utils import asrun, is_valid_json, sessions_file_path user_agent_list = agents.split("\n") @@ -174,7 +174,7 @@ def on_message(self, wss, wm): if str(self.api.wss_message) == "41": self.logger.info("disconnection event triggered by the platform, running automatic reconnection") self.api.check_websocket_if_connect = 0 - asyncio.run(self.api.reconnect()) + asrun(self.api.connect()) if "51-" in str(self.api.wss_message): self.api._temp_status = str(self.api.wss_message) elif self.api._temp_status == """451-["settings/list",{"_placeholder":true,"num":0}]""":