diff --git a/hummingbot/client/command/balance_command.py b/hummingbot/client/command/balance_command.py index 7e04ebceed..761936bdab 100644 --- a/hummingbot/client/command/balance_command.py +++ b/hummingbot/client/command/balance_command.py @@ -79,13 +79,17 @@ async def show_balances( total_col_name = f"Total ({global_token_symbol})" sum_not_for_show_name = "sum_not_for_show" self.notify("Updating balances, please wait...") - network_timeout = float(self.client_config_map.commands_timeout.other_commands_timeout) + network_timeout = float(self.client_config_map.commands_timeout.graphene_timeout) if bool(self.client_config_map.commands_timeout.use_graphene) else float(self.client_config_map.commands_timeout.other_commands_timeout) try: all_ex_bals = await asyncio.wait_for( UserBalances.instance().all_balances_all_exchanges(self.client_config_map), network_timeout ) except asyncio.TimeoutError: - self.notify("\nA network error prevented the balances to update. See logs for more details.") + self.notify( + "\nA network error prevented the balances to update. " + "If you are using a graphene exchange change commands_timeout.use_graphene " + "to True. See logs for more details." + ) raise all_ex_avai_bals = UserBalances.instance().all_available_balances_all_exchanges() diff --git a/hummingbot/client/command/connect_command.py b/hummingbot/client/command/connect_command.py index 1f710ab202..9bd5d677d9 100644 --- a/hummingbot/client/command/connect_command.py +++ b/hummingbot/client/command/connect_command.py @@ -115,10 +115,16 @@ async def validate_n_connect_connector( await Security.wait_til_decryption_done() api_keys = Security.api_keys(connector_name) network_timeout = float(self.client_config_map.commands_timeout.other_commands_timeout) + graphene_timeout = float(self.client_config_map.commands_timeout.graphene_timeout) try: err_msg = await asyncio.wait_for( UserBalances.instance().add_exchange(connector_name, self.client_config_map, **api_keys), - network_timeout, + ( + network_timeout + if connector_name not in + ["bitshares", "peerplays", "bitshares_testnet", "peerplays_testnet"] + else graphene_timeout + ), ) except asyncio.TimeoutError: self.notify( diff --git a/hummingbot/client/command/create_command.py b/hummingbot/client/command/create_command.py index df5e6c00cd..4f48cbc959 100644 --- a/hummingbot/client/command/create_command.py +++ b/hummingbot/client/command/create_command.py @@ -357,7 +357,15 @@ async def verify_status( ): try: timeout = float(self.client_config_map.commands_timeout.create_command_timeout) - all_status_go = await asyncio.wait_for(self.status_check_all(), timeout) + graphene_timeout = float(self.client_config_map.commands_timeout.graphene_timeout) + all_status_go = await asyncio.wait_for(self.status_check_all(), timeout if not any( + [ + i in ["bitshares", "peerplays", "bitshares_testnet", "peerplays_testnet"] + for i in required_exchanges + ] + ) + else graphene_timeout + ) except asyncio.TimeoutError: self.notify("\nA network error prevented the connection check to complete. See logs for more details.") self.strategy_file_name = None diff --git a/hummingbot/client/command/exit_command.py b/hummingbot/client/command/exit_command.py index 5984a2a153..456ddc41a1 100644 --- a/hummingbot/client/command/exit_command.py +++ b/hummingbot/client/command/exit_command.py @@ -3,6 +3,7 @@ import asyncio from typing import TYPE_CHECKING +from hummingbot.connector.exchange.graphene.graphene_exchange import kill_metanode from hummingbot.core.utils.async_utils import safe_ensure_future if TYPE_CHECKING: @@ -12,6 +13,7 @@ class ExitCommand: def exit(self, # type: HummingbotApplication force: bool = False): + kill_metanode() safe_ensure_future(self.exit_loop(force), loop=self.ev_loop) async def exit_loop(self, # type: HummingbotApplication diff --git a/hummingbot/client/command/status_command.py b/hummingbot/client/command/status_command.py index 36d670e347..a8e609f2c5 100644 --- a/hummingbot/client/command/status_command.py +++ b/hummingbot/client/command/status_command.py @@ -173,8 +173,20 @@ async def status_check_all(self, # type: HummingbotApplication return False network_timeout = float(self.client_config_map.commands_timeout.other_commands_timeout) + graphene_timeout = float(self.client_config_map.commands_timeout.graphene_timeout) try: - invalid_conns = await asyncio.wait_for(self.validate_required_connections(), network_timeout) + invalid_conns = await asyncio.wait_for( + self.validate_required_connections(), + ( + network_timeout if not any( + [ + i in ["bitshares", "peerplays", "bitshares_testnet", "peerplays_testnet"] + for i in required_exchanges + ] + ) + else graphene_timeout + ) + ) except asyncio.TimeoutError: self.notify("\nA network error prevented the connection check to complete. See logs for more details.") raise diff --git a/hummingbot/client/config/client_config_map.py b/hummingbot/client/config/client_config_map.py index 1886c5bfb0..bea7062712 100644 --- a/hummingbot/client/config/client_config_map.py +++ b/hummingbot/client/config/client_config_map.py @@ -529,7 +529,7 @@ class GlobalTokenConfigMap(BaseClientModel): default="$", client_data=ClientFieldData( prompt=lambda - cm: "What is your default display token symbol? (e.g. $,€)", + cm: "What is your default display token symbol? (e.g. $, €, ₿, ¤)", ), ) @@ -572,6 +572,23 @@ class CommandsTimeoutConfigMap(BaseClientModel): ), ), ) + graphene_timeout: Decimal = Field( + default=Decimal("120"), + gt=Decimal("0"), + client_data=ClientFieldData( + prompt=lambda cm: ( + "Network timeout to apply to the starting and status checking of graphene" + ), + ) + ) + use_graphene: bool = Field( + default=False, + client_data=ClientFieldData( + prompt=lambda cm: ( + "Use longer timeout during balance check for Graphene exchanges (Yes/No)" + ), + ) + ) class Config: title = "commands_timeout" @@ -579,12 +596,22 @@ class Config: @validator( "create_command_timeout", "other_commands_timeout", + "graphene_timeout", pre=True, ) def validate_decimals(cls, v: str, field: Field): """Used for client-friendly error output.""" return super().validate_decimal(v, field) + @validator("use_graphene", pre=True) + def validate_bool(cls, v: str): + """Used for client-friendly error output.""" + if isinstance(v, str): + ret = validate_bool(v) + if ret is not None: + raise ValueError(ret) + return v + class AnonymizedMetricsMode(BaseClientModel, ABC): @abstractmethod diff --git a/hummingbot/client/settings.py b/hummingbot/client/settings.py index 8da7e3e683..d83e08dc04 100644 --- a/hummingbot/client/settings.py +++ b/hummingbot/client/settings.py @@ -299,9 +299,9 @@ def add_domain_parameter(self, params: Dict[str, Any]) -> Dict[str, Any]: def base_name(self) -> str: if self.is_sub_domain: - return self.parent_name + return self.parent_name if self.parent_name not in ["peerplays", "bitshares", "peerplays_testnet", "bitshares_testnet"] else "graphene" else: - return self.name + return self.name if self.name not in ["peerplays", "bitshares", "peerplays_testnet", "bitshares_testnet"] else "graphene" def non_trading_connector_instance_with_default_configuration( self, @@ -393,12 +393,13 @@ def create_connector_settings(cls): util_module = importlib.import_module(util_module_path) except ModuleNotFoundError: continue + name = "peerplays" if connector_dir.name == "graphene" else connector_dir.name trade_fee_settings: List[float] = getattr(util_module, "DEFAULT_FEES", None) trade_fee_schema: TradeFeeSchema = cls._validate_trade_fee_schema( - connector_dir.name, trade_fee_settings + name, trade_fee_settings ) - cls.all_connector_settings[connector_dir.name] = ConnectorSetting( - name=connector_dir.name, + cls.all_connector_settings[name] = ConnectorSetting( + name=name, type=ConnectorType[type_dir.name.capitalize()], centralised=getattr(util_module, "CENTRALIZED", True), example_pair=getattr(util_module, "EXAMPLE_PAIR", ""), @@ -415,7 +416,7 @@ def create_connector_settings(cls): for domain in other_domains: trade_fee_settings = getattr(util_module, "OTHER_DOMAINS_DEFAULT_FEES")[domain] trade_fee_schema = cls._validate_trade_fee_schema(domain, trade_fee_settings) - parent = cls.all_connector_settings[connector_dir.name] + parent = cls.all_connector_settings[name] cls.all_connector_settings[domain] = ConnectorSetting( name=domain, type=parent.type, diff --git a/hummingbot/connector/exchange/graphene/.gitignore b/hummingbot/connector/exchange/graphene/.gitignore new file mode 100644 index 0000000000..ce3f8c78bf --- /dev/null +++ b/hummingbot/connector/exchange/graphene/.gitignore @@ -0,0 +1,2 @@ +/database +!/database/__init__.py diff --git a/hummingbot/connector/exchange/graphene/__init__.py b/hummingbot/connector/exchange/graphene/__init__.py new file mode 100644 index 0000000000..792d600548 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/__init__.py @@ -0,0 +1 @@ +# diff --git a/hummingbot/connector/exchange/graphene/dummy.pxd b/hummingbot/connector/exchange/graphene/dummy.pxd new file mode 100644 index 0000000000..4b098d6f59 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/dummy.pxd @@ -0,0 +1,2 @@ +cdef class dummy(): + pass diff --git a/hummingbot/connector/exchange/graphene/dummy.pyx b/hummingbot/connector/exchange/graphene/dummy.pyx new file mode 100644 index 0000000000..4b098d6f59 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/dummy.pyx @@ -0,0 +1,2 @@ +cdef class dummy(): + pass diff --git a/hummingbot/connector/exchange/graphene/graphene_api_order_book_data_source.py b/hummingbot/connector/exchange/graphene/graphene_api_order_book_data_source.py new file mode 100644 index 0000000000..c1303228f1 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_api_order_book_data_source.py @@ -0,0 +1,351 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation, no-member, broad-except, no-name-in-module +# pylint: disable=arguments-differ +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_api_order_book_data_source v1.0.0 +~ +""" +# STANDARD MODULES +import asyncio +import logging +import time +from collections import defaultdict +from decimal import Decimal +from typing import Any, Dict, List, Mapping, Optional + +# METANODE MODULES +from metanode.graphene_metanode_client import GrapheneTrustlessClient + +# HUMMINGBOT MODULES +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.connector.exchange.graphene.graphene_order_book import GrapheneOrderBook +from hummingbot.core.api_throttler.async_throttler import AsyncThrottler +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import OrderBookMessage, OrderBookMessageType +from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource +from hummingbot.core.event.events import TradeType +from hummingbot.logger import HummingbotLogger + + +class GrapheneAPIOrderBookDataSource(OrderBookTrackerDataSource): + """ + connect to metanode to get bid, ask, and market history updates + """ + + _logger: Optional[HummingbotLogger] = None + _trading_pair_symbol_map: Dict[str, Mapping[str, str]] = {} + _mapping_initialization_lock = asyncio.Lock() + + def __init__( + self, + domain: str, + trading_pairs: List[str], + **__, + ): + # ~ print("GrapheneAPIOrderBookDataSource") + super().__init__(trading_pairs) + # ~ self._order_book_create_function = lambda: OrderBook() + self._order_book_create_function = OrderBook + + self._message_queue: Dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) + + self.domain = domain + self.constants = GrapheneConstants(domain) + self.metanode = GrapheneTrustlessClient(self.constants) + + @classmethod + def logger(cls) -> HummingbotLogger: + """ + a classmethod for logging + """ + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + @staticmethod + async def get_last_traded_prices( + domain: str, + *_, + **__, + ) -> Dict[str, float]: + """ + Return a dictionary the trading_pair as key and the current price as value + for each trading pair passed as parameter + :param trading_pairs: list of trading pairs to get the prices for + :param domain: the name of the graphene blockchain + :return: Dictionary of associations between token pair and its latest price + """ + constants = GrapheneConstants(domain) + metanode = GrapheneTrustlessClient(constants) + metanode_pairs = metanode.pairs # DISCRETE SQL QUERY + return {k: v["last"] for k, v in metanode_pairs.items()} + + @staticmethod + async def get_all_mid_prices(domain: str) -> Dict[str, Decimal]: + """ + Returns the mid price of all trading pairs, + obtaining the information from the exchange. + This functionality is required by the market price strategy. + :param domain: the name of the graphene blockchain + :return: Dictionary with the trading pair as key, and the mid price as value + """ + constants = GrapheneConstants(domain) + metanode = GrapheneTrustlessClient(constants) + metanode_pairs = metanode.pairs # DISCRETE SQL QUERY + ret = [] + for pair in metanode_pairs: + ret[pair] = Decimal((pair["book"]["asks"][0] + pair["book"]["bids"][0]) / 2) + return ret + + @staticmethod + async def exchange_symbol_associated_to_pair( + trading_pair: str, + domain: str, + **__, + ) -> str: + """ + 1:1 mapping BASE-QUOTE + :param trading_pair: BASE-QUOTE + :param domain: the name of the graphene blockchain + :return: BASE-QUOTE + """ + symbol_map = await GrapheneAPIOrderBookDataSource.trading_pair_symbol_map( + domain=domain + ) + + return symbol_map.inverse[trading_pair] + + @staticmethod + async def trading_pair_associated_to_exchange_symbol( + symbol: str, + domain: str, + **__, + ) -> str: + """ + Used to translate a trading pair from exchange to client notation + :param symbol: trading pair in exchange notation + :param domain: the name of the graphene blockchain + :return: trading pair in client notation + """ + symbol_map = await GrapheneAPIOrderBookDataSource.trading_pair_symbol_map( + domain=domain + ) + return symbol_map[symbol] + + async def get_new_order_book(self, trading_pair: str) -> OrderBook: + """ + Creates a local instance of the exchange order book for one pair + :param trading_pair: BASE-QUOTE + :return: a local copy of the current order book in the exchange + """ + msg = await self.get_snapshot(trading_pair) + snapshot: OrderBookMessage = GrapheneOrderBook.snapshot_message_from_exchange( + msg=msg, + timestamp=time.time(), + metadata={ + "trading_pair": trading_pair, + "blocktime": self.metanode.timing["blocktime"], + }, + ) + book = self.order_book_create_function() + book.apply_snapshot(snapshot.bids, snapshot.asks, snapshot.update_id) + return book + + async def listen_for_trades( + self, ev_loop: asyncio.AbstractEventLoop, output: asyncio.Queue + ): + """ + reads the trade events queue, for each event + ~ creates a trade message instance + ~ adds it to the output queue + :param ev_loop: the event loop the method will run in + :param output: a queue to add the created trade messages + """ + # wait for metanode to intialize + while not 0 < time.time() - self.metanode.timing["blocktime"] < 60: + await self._sleep(1) + continue # SQL QUERY WHILE LOOP + previous_history = {pair: [] for pair in self.constants.chain.PAIRS} + while True: + try: + metanode_pairs = self.metanode.pairs + for pair in self.constants.chain.PAIRS: + if str(previous_history[pair]) != str( + metanode_pairs[pair]["history"] + ): + new_trades = [ + i + for i in metanode_pairs[pair]["history"] + if i not in previous_history[pair] + ] + for trade in new_trades: + # [unix, price, amount, trade_type, sequence] + trade_msg: OrderBookMessage = ( + GrapheneOrderBook.trade_message_from_exchange( + { + "trading_pair": pair, + "trade_type": trade[3], # trade_type + "trade_id": trade[4], # sequence + "update_id": trade[0], # unix + "price": trade[1], # price + "amount": trade[2], # amount + } + ) + ) + output.put_nowait(trade_msg) + previous_history = { + pair: metanode_pairs[pair]["history"] + for pair in self.constants.chain.PAIRS + } + await self._sleep(3) + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + self.logger().exception( + "Unexpected error when processing public trade updates from" + " exchange" + ) + + async def listen_for_order_book_diffs( + self, + *_, + **__, + ): + """ + N/A + """ + + async def listen_for_order_book_snapshots( + self, + ev_loop: asyncio.AbstractEventLoop, + output: asyncio.Queue, + ): + """ + This method runs continuously and requests the full order book content + from the exchange every 3 seconds via SQL query to the metanode database + It then creates a snapshot messages that are added to the output queue + :param ev_loop: the event loop the method will run in + :param output: a queue to add the created snapshot messages + """ + while True: + try: + for trading_pair in self.constants.chain.PAIRS: + try: + snapshot: Dict[str, Any] = await self.get_snapshot( + trading_pair=trading_pair + ) + snapshot_timestamp: float = time.time() + snapshot_msg: OrderBookMessage = ( + GrapheneOrderBook.snapshot_message_from_exchange( + snapshot, + snapshot_timestamp, + metadata={ + "trading_pair": trading_pair, + "blocktime": self.metanode.timing["blocktime"], + }, + ) + ) + output.put_nowait(snapshot_msg) + msg = f"Saved order book snapshot for {trading_pair}" + self.logger().debug(msg) + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + msg = ( + "Unexpected error fetching order book snapshot for" + f" {trading_pair}." + ) + self.logger().error(msg, exc_info=True) + await self._sleep(5.0) + await self._sleep(3) + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + self.logger().error("Unexpected error.", exc_info=True) + await self._sleep(5.0) + + async def listen_for_subscriptions(self): + """ + Graphene does not use this + """ + + async def get_snapshot( + self, + trading_pair: str, + **__, + ) -> Dict[str, Any]: + """ + Retrieves a copy of the full order book from the exchange, for one pair. + :param trading_pair: BASE-QUOTE + :param limit: the depth of the order book to retrieve + :return: the response from the exchange (JSON dictionary) + """ + metanode = GrapheneTrustlessClient(self.constants) + return metanode.pairs[trading_pair]["book"] # Discrete SQL Query + + @classmethod + def _get_throttler_instance(cls) -> AsyncThrottler: + return AsyncThrottler([]) # self.constants.RATE_LIMITS) + + @classmethod + def trade_message_from_exchange( + cls, msg: Dict[str, any], metadata: Optional[Dict] = None + ): + """ + Creates a trade message with info from each trade event sent by the exchange + :param msg: the trade event details sent by the exchange + :param metadata: a dictionary with extra information to add to trade message + :return: a trade message with details of the trade as provided by the exchange + """ + if metadata: + msg.update(metadata) + return OrderBookMessage( + OrderBookMessageType.TRADE, + { + "trading_pair": msg["trading_pair"], + "trade_type": ( + float(TradeType.SELL.value) + if msg["trade_type"] == "SELL" + else float(TradeType.BUY.value) + ), + "trade_id": msg["trade_id"], + "update_id": msg["update_id"], + "price": msg["price"], + "amount": msg["amount"], + }, + timestamp=int(time.time() * 1e-3), + ) + + @classmethod + async def _get_last_traded_price( + cls, + trading_pair: str, + domain: str, + **__, + ) -> float: + """ + Return a dictionary the trading_pair as key and the current price as value + for each trading pair passed as parameter + :param trading_pairs: list of trading pairs to get the prices for + :param domain: the name of the graphene blockchain + :param api_factory: N/A + :param throttler: N/A + :return: Dictionary of associations between token pair and its latest price + """ + + constants = GrapheneConstants(domain) + metanode = GrapheneTrustlessClient(constants) + return float(metanode.pairs[trading_pair]["last"]) # Discrete SQL Query diff --git a/hummingbot/connector/exchange/graphene/graphene_api_user_stream_data_source.py b/hummingbot/connector/exchange/graphene/graphene_api_user_stream_data_source.py new file mode 100644 index 0000000000..8c386fe342 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_api_user_stream_data_source.py @@ -0,0 +1,289 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation, no-member, no-name-in-module, too-many-function-args +# pylint: disable=too-many-branches, broad-except, too-many-locals +# pylint: disable=too-many-nested-blocks, too-many-statements +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_api_user_stream_data_source v1.0.0 +~ +""" +# STANDARD MODULES +import asyncio +import logging +import time +from typing import Optional + +# METANODE MODULES +from metanode.graphene_metanode_client import GrapheneTrustlessClient + +# HUMMINGBOT MODULES +from hummingbot.connector.client_order_tracker import ClientOrderTracker +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.core.data_type.in_flight_order import OrderState +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.logger import HummingbotLogger + +# GLOBAL CONSTANTS +DEV = False + + +class GrapheneAPIUserStreamDataSource(UserStreamTrackerDataSource): + """ + connect to metanode to get open order updates + """ + + _logger: Optional[HummingbotLogger] = None + + def __init__( + self, + # auth: GrapheneAuth, + domain: str, + order_tracker: ClientOrderTracker, + ): + # ~ print("GrapheneAPIUserStreamDataSource") + super().__init__() + self._current_listen_key = None + self._last_recv_time: float = 0 + self._order_tracker = order_tracker + self._ws_assistant = None + self.domain = domain + self.constants = GrapheneConstants(domain) + self.metanode = GrapheneTrustlessClient(self.constants) + + @classmethod + def logger(cls) -> HummingbotLogger: + """ + a classmethod for logging + """ + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def dev_log(self, *args, **kwargs): + """ + log only in dev mode + """ + if DEV: + self.logger().info(*args, **kwargs) + + @property + def last_recv_time(self) -> float: + """ + Returns the time of the last received message + :return: the timestamp of the last received message in seconds + """ + # ~ print("GrapheneAPIUserStreamDataSource last_recv_time") + if self._ws_assistant: + return self._ws_assistant.last_recv_time + return -1 + + async def listen_for_user_stream( + self, ev_loop: asyncio.AbstractEventLoop, output: asyncio.Queue + ): + """ + Connects to the user private channel in the DEX + With the established connection listens to all balance events + and order updates provided by the DEX, + and stores them in the output queue + """ + + def get_latest_events(): + + metanode_pairs = self.metanode.pairs # DISCRETE SQL QUERY + metanode_account = self.metanode.account # DISCRETE SQL QUERY + events = {} + for pair in self.constants.chain.PAIRS: + events[pair] = { + "fills": list(metanode_pairs[pair]["fills"]), + "opens": list(metanode_pairs[pair]["opens"]), + "creates": list(metanode_pairs[pair]["ops"]["creates"]), + "cancels": list(metanode_account["cancels"]), + } + # ~ self.dev_log(events) + return events + + # ~ print("GrapheneAPIUserStreamDataSource listen_for_user_stream") + # wait for metanode to intialize + while not 0 < time.time() - self.metanode.timing["blocktime"] < 60: + await self.sleep(1) + continue # SQL QUERY WHILE LOOP + # tare the scale upon initialization + novel = {} + latest = {} + removed = {} + previous = {} + events = get_latest_events() + for pair in self.constants.chain.PAIRS: + novel[pair] = {} + latest[pair] = {} + removed[pair] = {} + previous[pair] = {} + for event in list(events[pair].keys()): + previous[pair][event] = events[pair][event] + + while True: + try: + # create a 3 dimensional dataset; scope * pairs * events + # we may not need all of it, but it allows for future logic + events = get_latest_events() + tracked_orders = self._order_tracker.all_orders + self.dev_log(tracked_orders) + + for pair in self.constants.chain.PAIRS: + for event in list(events[pair].keys()): + # get the latest filled, opened, created, and cancelled orders + latest[pair][event] = events[pair][event] + # sort out the novel and removed orders updates + novel[pair][event] = [ + f + for f in latest[pair][event] + if f not in previous[pair][event] + ] + removed[pair][event] = [ + f + for f in previous[pair][event] + if f not in latest[pair][event] + ] + # reset previous state to current state + previous[pair][event] = list(latest[pair][event]) + # process novel user stream order data for this pair + for pair in self.constants.chain.PAIRS: + # handle recent partial + for fill_order in novel[pair]["fills"]: + + if fill_order["exchange_order_id"] in [ + tracked_order.exchange_order_id for tracked_order in tracked_orders.values() + ]: + new_state = ( + OrderState.PARTIALLY_FILLED + if fill_order in latest[pair]["opens"] + else OrderState.FILLED + ) + event_msg = { + "trading_pair": pair, + "execution_type": "TRADE", + "client_order_id": self._order_tracker.swap_id( + str(fill_order["exchange_order_id"]) + ), + "exchange_order_id": fill_order["exchange_order_id"], + # rpc database get_trade_history + # + # ~ {'sequence': 183490, + # ~ 'date': '2022-01-21T20:41:36', + # ~ 'price': '0.025376407606742865', + # ~ 'amount': '414.76319', + # ~ 'value': '10.5252', + # ~ 'type': 'sell', + # ~ 'side1_account_id':'1.2.1624289', + # ~ 'side2_account_id': '1.2.883283'} + # rpc history get_fill_order_history + # + # ~ {"id": "0.0.69", + # ~ "key": { + # ~ "base": "1.3.0", + # ~ "quote": "1.3.8", + # ~ "sequence": -5 + # ~ }, + # ~ "time.time": "2021-12-22T23:09:42", + # ~ "op": { + # ~ "fee": { + # ~ "amount": 0, + # ~ "asset_id": "1.3.8" + # ~ }, + # ~ "order_id": "1.7.181", + # ~ "account_id": "1.2.207", + # ~ "pays": { + # ~ "amount": 100000, + # ~ "asset_id": "1.3.0" + # ~ }, + # ~ "receives": { + # ~ "amount": 60000000, + # ~ "asset_id": "1.3.8" + # ~ }}} + # ~ fill_key_sequence = history_sequence ? + # ~ else: + # ~ trade_id = sha256( + # ~ + oldest_asset + # ~ + newest_asset + # ~ + oldest_account + # ~ + newest_account + # ~ + price + # ~ + amount*value + # ~ + amount+value + # ~ + unix + # ~ ) + "trade_id": str(), # needs to match OrderBookMessage + "fee_asset": fill_order["fee"]["asset"], + "fee_paid": fill_order["fee"]["amount"], + "fill_price": fill_order["price"], + "fill_timestamp": fill_order["unix"], + "fill_base_amount": fill_order["amount"], + "new_state": new_state, + } + output.put_nowait(event_msg) + + # handle recent cancellations + for cancel_order in novel[pair]["cancels"]: + + self.dev_log("CANCELS " + str(tracked_orders)) + if cancel_order["order_id"] in [ + v.exchange_order_id for k, v in tracked_orders.items() + ]: + event_msg = { + "trading_pair": pair, + "execution_type": None, + "client_order_id": self._order_tracker.swap_id( + str(cancel_order["order_id"]) + ), + "exchange_order_id": str(cancel_order["order_id"]), + "update_timestamp": int(time.time()), + "new_state": OrderState.CANCELED, + } + output.put_nowait(event_msg) + self.dev_log("CANCELS EVENT" + str(event_msg)) + # handle recent fully filled orders + for open_order in removed[pair]["opens"]: + self.dev_log("FILLS" + str(open_order)) + # removed[pair][opens] + # *could* exist before latest[pair][cancels] + # so wait a minute before confirming a FILL + await self._sleep(60) + metanode_account = self.metanode.account # DISCRETE SQL QUERY + # if open_order not in latest[pair]["cancels"] and open_order[ + if open_order not in metanode_account["cancels"] and open_order[ + "order_number" + ] in [v.exchange_order_id for k, v in tracked_orders.items()]: + + self.dev_log("FILLS ORDER" + str(open_order)) + event_msg = { + "trading_pair": pair, + "execution_type": None, + "client_order_id": self._order_tracker.swap_id( + str(open_order["order_number"]) + ), + "exchange_order_id": str(open_order["order_number"]), + "update_timestamp": int(time.time()), + "new_state": OrderState.FILLED, + } + output.put_nowait(event_msg) + self.dev_log("FILLS EVENT" + str(event_msg)) + self.dev_log("NOVEL" + str(pair) + str(novel)) + await self._sleep(1) + + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + + except Exception: + self.logger().exception( + "Unexpected error while listening to user stream. " + "Retrying after 5 seconds..." + ) + await self._sleep(5) diff --git a/hummingbot/connector/exchange/graphene/graphene_auth.py b/hummingbot/connector/exchange/graphene/graphene_auth.py new file mode 100644 index 0000000000..98f0277308 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_auth.py @@ -0,0 +1,42 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation + +r""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +auth = GrapheneAuth +order = auth.prototype_order(pair) +order["edicts"] = {} +broker(order) +""" +# METANODE MODULES +from metanode.graphene_auth import GrapheneAuth as MetanodeGrapheneAuth + +# HUMMINGBOT MODULES +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants + + +class GrapheneAuth(MetanodeGrapheneAuth): + """ + given a Wallet Import Format (WIF) Active (or Owner) Key + expose buy/sell/cancel methods: + ~ prototype_order() + ~ broker() + """ + + def __init__( + self, + wif: str, + domain: str, + ): + # ~ print("GrapheneAuth") + self.wif = wif + self.domain = domain + self.constants = GrapheneConstants(domain) + super().__init__(self.constants, wif) + self.carry_prints = True diff --git a/hummingbot/connector/exchange/graphene/graphene_constants.py b/hummingbot/connector/exchange/graphene/graphene_constants.py new file mode 100644 index 0000000000..bb34fc8764 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_constants.py @@ -0,0 +1,195 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=too-few-public-methods, bad-continuation +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +""" + +import json +import os + +# METANODE MODULES +from metanode.graphene_constants import GrapheneConstants as MetanodeGrapheneConstants +from metanode.graphene_utils import assets_from_pairs, invert_pairs, sls + + +class GrapheneConstants(MetanodeGrapheneConstants): + """ + used for user configuration to override Graphene default constants, exposes: + ~ self.chain + ~ self.metanode + ~ self.signing + ~ self.core + ~ self.hummingbot + for the most part the user will edit self.chain + """ + + def __init__( + self, + domain: str = "", + ): + # ~ print("GrapheneConstants", domain) + domain = domain.lower().replace("_", " ") + super().__init__( + chain_name=domain if domain else None + ) + self.hummingbot = HummingbotConfig + if domain != "": + # initialize config for this blockchain domain; eg. peerplays or bitshares + self.chains["peerplays"]["config"] = PeerplaysConfig + self.chains["bitshares"]["config"] = BitsharesConfig + self.chains["peerplays testnet"]["config"] = PeerplaysTestnetConfig + self.chains["bitshares testnet"]["config"] = BitsharesTestnetConfig + self.chain = self.chains[domain.lower()]["config"] + self.chain.NAME = domain.lower() + self.chain.CORE = self.chains[self.chain.NAME]["core"].upper() + self.chain.ID = self.chains[self.chain.NAME]["id"] + self.chain.NODES = [node.lower() for node in sls(self.chain.NODES)] + self.chain.DATABASE = ( + str(os.path.dirname(os.path.abspath(__file__))) + + "/database/" + + self.chain.NAME.replace(" ", "_") + + ".db" + ) + self.DATABASE_FOLDER = str(os.path.dirname(os.path.abspath(__file__))) + "/database/" + try: + with open(self.DATABASE_FOLDER + self.chain.NAME.replace(" ", "_") + "_pairs.txt", "r") as handle: + data = json.loads(handle.read()) + handle.close() + self.chain.PAIRS = data[0] + self.chain.ACCOUNT = data[1] + except FileNotFoundError: + pass + self.process_pairs() + self.core.PATH = str(os.path.dirname(os.path.abspath(__file__))) + self.chain.TITLE = self.chain.NAME.title() + if not hasattr(self.chain, "PREFIX"): + self.chain.PREFIX = self.chain.CORE + + def process_pairs(self): + self.chain.PAIRS = [ + i for i in self.chain.PAIRS if i not in invert_pairs(self.chain.PAIRS) + ] + self.chain.INVERTED_PAIRS = invert_pairs(self.chain.PAIRS) + self.chain.ASSETS = list(set(assets_from_pairs(self.chain.PAIRS) + [self.chain.CORE])) + self.chain.CORE_PAIRS = [ + i + for i in [ + self.chain.CORE + "-" + asset + for asset in self.chain.ASSETS + if asset != self.chain.CORE + ] + if i not in self.chain.PAIRS and i not in self.chain.INVERTED_PAIRS + ] + self.chain.INVERTED_CORE_PAIRS = invert_pairs(self.chain.CORE_PAIRS) + self.chain.ALL_PAIRS = ( + self.chain.PAIRS + + self.chain.CORE_PAIRS + + self.chain.INVERTED_PAIRS + + self.chain.INVERTED_CORE_PAIRS + ) + + +class HummingbotConfig: + """ + constants specific to this connector + """ + + SYNCHRONIZE = False + SNAPSHOT_SLEEP = 30 + ORDER_PREFIX = "" + + +class PeerplaysConfig: + """ + ╔═════════════════════════════╗ + ║ HUMMINGBOT GRAPHENE ║ + ║ ╔═╗╔═╗╔═╗╦═╗╔═╗╦ ╔═╗╦ ╦╔═╗ ║ + ║ ╠═╝║╣ ║╣ ╠╦╝╠═╝║ ╠═╣╚╦╝╚═╗ ║ + ║ ╩ ╚═╝╚═╝╩╚═╩ ╩═╝╩ ╩ ╩ ╚═╝ ║ + ║ DEX MARKET MAKING CONNECTOR ║ + ╚═════════════════════════════╝ + configuration details specific to peerplays mainnet + """ + + ACCOUNT = "" + NODES = [ + "wss://ca.peerplays.info/", + "wss://de.peerplays.xyz/", + "wss://pl.peerplays.org/", + "ws://96.46.48.98:18090", + "wss://peerplaysblockchain.net/mainnet/api", + "ws://witness.serverpit.com:8090", + "ws://api.i9networks.net.br:8090", + "wss://node.mainnet.peerblock.trade" + ] + PAIRS = ["BTC-PPY", "HIVE-PPY", "HBD-PPY"] + BASES = ["BTC", "HIVE", "HBD"] + CORE = "PPY" + WHITELIST = [] + + +class PeerplaysTestnetConfig: + """ + configuration details specific to peerplays testnet + """ + + ACCOUNT = "litepresence1" + NODES = ["wss://testnet.peerplays.download/api"] + PAIRS = ["TEST-HIVE", "TEST-HBD"] + BASES = ["HIVE", "HBD", "ABC", "DEFG"] + CORE = "TEST" + WHITELIST = [] + + +class BitsharesConfig: + """ + ╔═════════════════════════════╗ + ║ HUMMINGBOT GRAPHENE ║ + ║ ╔╗ ╦╔╦╗╔═╗╦ ╦╔═╗╦═╗╔═╗╔═╗ ║ + ║ ╠╩╗║ ║ ╚═╗╠═╣╠═╣╠╦╝║╣ ╚═╗ ║ + ║ ╚═╝╩ ╩ ╚═╝╩ ╩╩ ╩╩╚═╚═╝╚═╝ ║ + ║ DEX MARKET MAKING CONNECTOR ║ + ╚═════════════════════════════╝ + configuration details specific to bitshares mainnet + """ + + ACCOUNT = "litepresence1" + NODES = [ + "wss://api.bts.mobi/wss", + "wss://eu.nodes.bitshares.ws/ws", + "wss://cloud.xbts.io/wss", + "wss://dex.iobanker.com/wss", + "wss://bts.mypi.win/wss", + "wss://node.xbts.io/wss", + "wss://public.xbts.io/ws", + "wss://btsws.roelandp.nl/wss", + "wss://api-us.61bts.com/wss", + "wss://api.dex.trading/wss", + ] + PAIRS = ["BTS-HONEST", "BTS-HONEST.USD", "BTS-XBTSX.USDT"] + BASES = ["HONEST", "XBTSX", "GDEX", "BTWTY", "IOB"] + CORE = "BTS" + WHITELIST = [] + + +class BitsharesTestnetConfig: + """ + configuration details specific to bitshares testnet + """ + + ACCOUNT = "" + NODES = [ + "wss://testnet.bitshares.im/ws", + "wss://testnet.dex.trading/", + "wss://testnet.xbts.io/ws", + "wss://api-testnet.61bts.com/ws", + ] + PAIRS = ["TEST-USD", "TEST-CNY"] + BASES = ["USD", "CNY"] + CORE = "TEST" + WHITELIST = [] diff --git a/hummingbot/connector/exchange/graphene/graphene_exchange.py b/hummingbot/connector/exchange/graphene/graphene_exchange.py new file mode 100644 index 0000000000..3e5bb2af2f --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_exchange.py @@ -0,0 +1,1587 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation, broad-except, no-member, too-many-lines +# pylint: disable=no-name-in-module, too-many-arguments, too-many-public-methods +# pylint: disable=too-many-locals, too-many-instance-attributes, import-error +# pylint: disable=too-many-statements, useless-super-delegation +# pylint: disable=too-many-instance-attributes, +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_exchange v1.0.0 +~ +""" +# STANDARD MODULES +import asyncio +import json +import logging +import os +import time +from decimal import Decimal +from itertools import permutations +from multiprocessing import Process +from threading import Thread +from typing import AsyncIterable, Dict, List, Optional + +# METANODE MODULES +from metanode.graphene_metanode_client import GrapheneTrustlessClient +from metanode.graphene_metanode_server import GrapheneMetanode +from metanode.graphene_rpc import RemoteProcedureCall + +# THIRD PARTY MODULES +from psutil import pid_exists + +# HUMMINGBOT MODULES +from hummingbot.client.config.config_helpers import ClientConfigAdapter +from hummingbot.connector.client_order_tracker import ClientOrderTracker +from hummingbot.connector.exchange.graphene import graphene_utils +from hummingbot.connector.exchange.graphene.graphene_auth import GrapheneAuth +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.connector.exchange.graphene.graphene_order_book_tracker import GrapheneOrderBookTracker +from hummingbot.connector.exchange.graphene.graphene_user_stream_tracker import GrapheneUserStreamTracker +from hummingbot.connector.exchange_base import ExchangeBase +from hummingbot.connector.time_synchronizer import TimeSynchronizer +from hummingbot.connector.trading_rule import TradingRule +from hummingbot.core.data_type.cancellation_result import CancellationResult +from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderState, OrderUpdate, TradeUpdate +from hummingbot.core.data_type.limit_order import LimitOrder +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.trade_fee import ( + AddedToCostTradeFee, + DeductedFromReturnsTradeFee, + TokenAmount, + TradeFeeBase, +) +from hummingbot.core.event.events import OrderType, TradeType +from hummingbot.core.network_iterator import NetworkStatus +from hummingbot.core.utils.async_utils import safe_ensure_future +from hummingbot.logger import HummingbotLogger + +# GLOBAL CONSTANTS +DEV = True +LEVEL = 0 # logging verbosity, 0 is least verbose, 2 is most +START_METANODE_CONCURRENTLY = False # Useful for some debugging scenarios + + +def dprint(*data): + """print for development""" + if DEV: + print(*data) + + +def dinput(data): + """input for development""" + out = None + if DEV: + out = input(data) + return out + + +def kill_metanode(): + for domain in ["bitshares", "bitshares_testnet", "peerplays", "peerplays_testnet"]: + constants = GrapheneConstants(domain) + try: + with open(constants.DATABASE_FOLDER + "metanode_flags.json", "w+") as handle: + handle.write(json.dumps({**json.loads(handle.read() or "{}"), domain.replace("_", " "): False})) + except FileNotFoundError: + pass + + +class GrapheneClientOrderTracker(ClientOrderTracker): + """ + add swap_order_id method to ClientOrderTracker + """ + + def __init__( + self, + connector, + ): + # ~ print("GrapheneClientOrderTracker") + super().__init__(connector) + + def swap_id( + self, + client_order_id: Optional[str] = None, + exchange_order_id: Optional[str] = None, + ) -> str: + """ + given client_order_id return exchange_order_id + given exchange_order_id return client_order_id + """ + if client_order_id and client_order_id in self.all_orders: + return self.all_orders[client_order_id].exchange_order_id + + if exchange_order_id: + for order in self.all_orders.values(): + if order.exchange_order_id == exchange_order_id: + return order.client_order_id + return None + + +class GrapheneExchange(ExchangeBase): + """ + the master class which ties together all DEX connector components + """ + + # FIXME move to hummingbot constants + SHORT_POLL_INTERVAL = 5.0 + UPDATE_ORDER_STATUS_MIN_INTERVAL = 10.0 + LONG_POLL_INTERVAL = 120.0 + _logger = None + + def __init__( + self, + client_config_map: "ClientConfigAdapter", + peerplays_wif: str, + peerplays_user: str, + peerplays_pairs: str, + domain: str = "peerplays", + trading_pairs: Optional[List[str]] = None, + trading_required: bool = True, + ): + # ~ print(__class__.__name__) + # ~ print( + # ~ "GrapheneExchange", peerplays_wif, domain, trading_pairs, trading_required + # ~ ) + + self._time_synchronizer = TimeSynchronizer() + self.domain = domain + super().__init__(client_config_map) + self._username = peerplays_user + self._pairs = peerplays_pairs.replace(" ", "").split(",") + self._wif = peerplays_wif + self._trading_pairs = trading_pairs + self._trading_required = trading_required + self._ev_loop = asyncio.get_event_loop() + self._poll_notifier = asyncio.Event() + # Dict[client_order_id:str, count:int] + self._order_not_found_records = {} + # Dict[trading_pair:str, TradingRule] + self._trading_rules = {} + # Dict[trading_pair:str, (maker_fee_percent:Dec, taker_fee_percent:Dec)] + self._trade_fees = {} + self._user_stream_event_listener_task = None + self._trading_rules_polling_task = None + self._user_stream_tracker_task = None + self._status_polling_task = None + self._metanode_process = None + self._starting_metanode = False + self._last_timestamp = 0 + self._last_poll_timestamp = 0 + self._last_update_trade_fees_timestamp = 0 + self._last_trades_poll_graphene_timestamp = 0 + + # initialize Graphene class objects + self.constants = GrapheneConstants(domain) + + os.makedirs(self.constants.DATABASE_FOLDER, exist_ok=True) + + with open(self.constants.DATABASE_FOLDER + domain + "_pairs.txt", "a+") as handle: + handle.seek(0) + contents = handle.read() + new_contents = json.dumps([self._pairs, self._username]) + # if there are new pairs, then change the file and signal to restart the metanode + if contents != new_contents: + handle.seek(0) + handle.write(new_contents) + self.logger().info("Pairs / Username changed. Signalling Metanode Server to restart. (This may take up to 60 seconds)") + self.dev_log(f"(was '{contents}', is now '{new_contents}')") + # Restart metanode + self._signal_metanode(None) + else: + # Start metanode or continue running + self._signal_metanode(True) + + self.constants = GrapheneConstants(domain) + self.constants.process_pairs() + self.dev_log("GrapheneConstants finalized", level=2) + + self.metanode = GrapheneTrustlessClient(self.constants) + self._metanode_server = GrapheneMetanode(self.constants) + self.dev_log("Created metanode server and client instance", level=2) + + if not os.path.isfile(self.constants.chain.DATABASE): + self._metanode_server.sql.restart() + self.dev_log("Database restarted", level=2) + + self._order_tracker: ClientOrderTracker = GrapheneClientOrderTracker( + connector=self + ) + self.dev_log("Created GrapheneClientOrderTracker", level=2) + self._order_book_tracker = GrapheneOrderBookTracker( + trading_pairs=trading_pairs, + domain=domain, + ) + self.dev_log("Created GrapheneOrderBookTracker", level=2) + self._user_stream_tracker = GrapheneUserStreamTracker( + domain=domain, + order_tracker=self._order_tracker, + ) + self.dev_log("Created GrapheneUserStreamTracker", level=2) + self._auth = GrapheneAuth( + wif=peerplays_wif, + domain=self.domain, + ) + self.dev_log("Created GrapheneAuth", level=2) + self.dev_log("End of GrapheneExchange.__init__", level=1) + + def dev_log(self, *args, level=0, **kwargs): + """ + log only in dev mode and where verbosity level is matched + """ + if DEV and LEVEL >= level: + self.logger().info(*args, **kwargs) + + @classmethod + def logger(cls) -> HummingbotLogger: + """ + a classmethod for logging + """ + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + @property + def name(self) -> str: + """ + the name of this graphene blockchain + """ + self.dev_log("name", level=2) + return self.domain + + @property + def order_books(self) -> Dict[str, OrderBook]: + """ + a dictionary keyed by pair of subdicts keyed bids/asks + """ + self.dev_log("order_books", level=2) + return self._order_book_tracker.order_books + + @property + def trading_rules(self) -> Dict[str, TradingRule]: + """ + a TradingRule object specific to a trading pair + """ + self.dev_log("trading_rules", level=2) + return self._trading_rules + + @property + def in_flight_orders(self) -> Dict[str, InFlightOrder]: + """ + a dict of active orders keyed by client id with relevant order tracking info + """ + self.dev_log("in_flight_orders", level=2) + return self._order_tracker.active_orders + + @property + def limit_orders(self) -> List[LimitOrder]: + """ + a list of LimitOrder objects + """ + self.dev_log("limit_orders", level=2) + return [ + in_flight_order.to_limit_order() + for in_flight_order in self.in_flight_orders.values() + ] + + @property + def tracking_states(self) -> Dict[str, any]: + """ + Returns a dictionary associating current active orders client id + to their JSON representation + """ + self.dev_log("tracking_states", level=2) + return {key: value.to_json() for key, value in self.in_flight_orders.items()} + + @property + def order_book_tracker(self) -> GrapheneOrderBookTracker: + """ + the class that tracks bids and asks for each pair + """ + self.dev_log("order_book_tracker", level=2) + return self._order_book_tracker + + @property + def user_stream_tracker(self) -> GrapheneUserStreamTracker: + """ + the class that tracks trades for each pair + """ + self.dev_log("user_stream_tracker", level=2) + return self._user_stream_tracker + + @property + def status_dict(self) -> Dict[str, bool]: + """ + Returns a dictionary with the values of all the conditions + that determine if the connector is ready to operate. + The key of each entry is the condition name, + and the value is True if condition is ready, False otherwise. + """ + self.dev_log("status_dict", level=2) + # self._update_balances() + # ~ self.dev_log(self._account_balances) + return { + "order_books_initialized": self._order_book_tracker.ready, + "account_balance": len(list(self._account_balances.values())) > 0 + if self._trading_required + else True, + "trading_rule_initialized": len(self._trading_rules) > 0, + "metanode_live": 0 < time.time() - self.metanode.timing["blocktime"] < 100, + } + + @property + def ready(self) -> bool: + """ + Returns True if the connector is ready to operate + (all connections established with the DEX). + If it is not ready it returns False. + """ + self.dev_log("ready", level=1) + self.dev_log(self.status_dict, level=1) + return all(self.status_dict.values()) + + @staticmethod + def graphene_order_type(order_type: OrderType) -> str: + """ + LIMIT + """ + return order_type.name.upper() + + @staticmethod + def to_hb_order_type(graphene_type: str) -> OrderType: + """ + OrderType.LIMIT + """ + return OrderType[graphene_type] + + @staticmethod + def supported_order_types(): + """ + a list containing only OrderType.LIMIT + """ + return [OrderType.LIMIT] + + async def _initialize_trading_pair_symbol_map(self): + rpc = RemoteProcedureCall(self.constants, self.constants.chain.NODES) + rpc.printing = False + + whitelisted_bases = self.constants.chain.BASES + whitelist = [] + # for each whitelisted base + for base in whitelisted_bases: + # search the blockchain for tokens starting with that base + whitelist.extend([i["symbol"] for i in rpc.list_assets(base) if "for_liquidity_pool" not in i]) + # if the list ends with a token containing the base, there may be more + while whitelist[-1].startswith(base): + # so keep going + whitelist.extend([i["symbol"] for i in rpc.list_assets(whitelist[-1]) if "for_liquidity_pool" not in i]) + # make sure there are no duplicates, that they are all actually whitelisted, and sort them + whitelist = sorted(list({i for i in whitelist if any(i.startswith(j + ".") for j in whitelisted_bases)})) + whitelist.extend(whitelisted_bases + [self.constants.chain.CORE, *self.constants.chain.WHITELIST]) + # permutate all possible pairs and join in hummingbot format + whitelist = ["-".join(i) for i in permutations(whitelist, 2)] + # output + self._set_trading_pair_symbol_map({i: i for i in whitelist}) + + async def start_network(self): + """ + Start all required tasks to update the status of the connector. + Those tasks include: + - The order book tracker + - The polling loop to update the trading rules + - The polling loop to update order status and balance status using REST API + (backup for main update process) + """ + level = 0 + while not pid_exists(self._check_metanode("pid")): + self.dev_log("Metanode is not running, not starting network until...", level) + await asyncio.sleep(1) + level = 1 # only repeat the message in verbose mode + + self.dev_log("GrapheneExchange.start_network") + self._order_book_tracker.start() + self.dev_log("Order Book Started") + self._trading_rules_polling_task = safe_ensure_future( + self._trading_rules_polling_loop() + ) + self.dev_log("Trading Rules Started") + # ~ if self._trading_required: + self._status_polling_task = safe_ensure_future(self._status_polling_loop()) + self.dev_log("Status Polling Started") + self._user_stream_tracker_task = safe_ensure_future( + self._user_stream_tracker.start() + ) + self.dev_log("User Stream Tracker Started") + self._user_stream_event_listener_task = safe_ensure_future( + self._user_stream_event_listener() + ) + self.dev_log("User Stream Listener Started") + + self.dev_log(f"Authenticating {self.domain}...") + msg = ( + "Authenticated" if self._auth.login()["result"] is True else "Login Failed" + ) + self.dev_log(msg) + + async def stop_network(self): + """ + This function is executed when the connector is stopped. + It perform a general cleanup and stops all background + tasks that require the connection with the DEX to work. + """ + await asyncio.sleep(0.1) + self.dev_log("GrapheneExchange.stop_network") + self.dev_log("Waiting for cancel_all...") + await asyncio.sleep(30) + self._last_timestamp = 0 + self._last_poll_timestamp = 0 + self._order_book_tracker.stop() + self._poll_notifier = asyncio.Event() + if self._status_polling_task is not None: + self._status_polling_task.cancel() + if self._user_stream_tracker_task is not None: + self._user_stream_tracker_task.cancel() + if self._trading_rules_polling_task is not None: + self._trading_rules_polling_task.cancel() + self._status_polling_task = self._user_stream_tracker_task = None + await asyncio.sleep(0.1) + try: + print("stopping metanode...") + self._signal_metanode(False) + if self.metanode: + print("waiting for metanode") + self._metanode_process.join() + except Exception: + self.dev_log("Failed to wait for metanode, must be in another instance.") + + def _signal_metanode(self, signal): + with open(self.constants.DATABASE_FOLDER + "metanode_flags.json", "w+") as handle: + handle.write(json.dumps({**json.loads(handle.read() or "{}"), self.domain.replace("_", " "): signal})) + handle.close() + + def _check_metanode(self, flag): + if flag == "signal": + try: + with open(self.constants.DATABASE_FOLDER + "metanode_flags.json", "r") as handle: + ret = json.loads(handle.read())[self.domain.replace("_", " ")] + handle.close() + except FileNotFoundError: + ret = None + elif flag == "pid": + try: + with open(self.constants.DATABASE_FOLDER + "metanode_pid", "r") as handle: + ret = int(handle.read()) + handle.close() + except FileNotFoundError: + ret = -1 + return ret + + def _deploy_metanode(self): + self._starting_metanode = True + self.logger().info("Deploying Metanode Server Process, please wait...") + self.logger().info( + "ALERT: Check your system monitor to ensure hardware compliance, " + "Metanode is cpu intensive, requires ram, and rapid read/write" + ) + self.logger().info( + "This may hang for a moment (less than 2 minutes) while starting " + "the Metanode, please be patient." + ) + try: + self._signal_metanode(False) + self._metanode_process.join() + except Exception: + pass + self._signal_metanode(True) + self._metanode_process = Process(target=self._metanode_server.deploy) + self._metanode_process.start() + with open(self.constants.DATABASE_FOLDER + "metanode_pid", "w") as handle: + try: + handle.write(str(self._metanode_process.pid)) + except AttributeError: + handle.write("-2") + handle.close() + # do not proceed until metanode is running + patience = 10 + while True: + patience -= 1 + msg = f"Metanode Server Initializing... patience={patience}" + if patience == -10: + msg = ( + "I am out of patience.\n" + + "It appears Metanode FAILED, check configuration and that" + + " DEV_PAUSE mode is off." + ) + self.dev_log(msg) + self._starting_metanode = False + break + self.dev_log(msg) + try: + # wait until less than one minute stale + blocktime = self.metanode.timing["blocktime"] + latency = time.time() - blocktime + if 0 < latency < 60: + msg = f"Metanode Connected, latency {latency:.2f}" + self.dev_log(msg) + self._starting_metanode = None + time.sleep(10) + break + except Exception as error: + self.dev_log(error) + time.sleep(6) + + async def check_network(self) -> NetworkStatus: + """ + ensure metanode blocktime is not stale, if it is, restart the metanode + """ + + self.dev_log(str(self.in_flight_orders), level=1) + self.dev_log("check_network", level=1) + status = NetworkStatus.NOT_CONNECTED + self.dev_log("Checking Network...") + try: + try: + blocktime = self.metanode.timing["blocktime"] + except IndexError: # the metanode has not created a database yet + blocktime = 0 + latency = time.time() - blocktime + metanode_signal, metanode_pid = self._check_metanode("signal"), self._check_metanode("pid") + metanode_exists = pid_exists(metanode_pid) + + # if the latency is over one minute presume the metanode failed and restart it + if latency > 60: + metanode_exists = False + + self.dev_log( + f"Current metanode signal: {metanode_signal}, " + f"pid: {metanode_pid}, " + f"exists flag: {metanode_exists}, " + f"and latency: {latency:.2f}" + ) + + if metanode_signal is None and metanode_exists: + self.dev_log("Metanode has been signalled to restart, but it still exists. Attempting to kill...") + os.kill(metanode_pid, 15) # SIGTERM, "default signal to terminate a process gracefully" + elif metanode_exists: + self.dev_log(f"Metanode Running, latency {latency:2f}") + status = NetworkStatus.CONNECTED + elif not self._username or not self._wif: + self.logger().info("Username/WIF not entered. Not starting Metanode Server.") + elif metanode_signal is False: + self.dev_log("Metanode has been signalled to shutdown, not restarting.") + elif self._starting_metanode: + self.dev_log("Metanode is being started in another thread, passing on...", level=1) + else: + # deploy metanode in a seperate thread, and + if START_METANODE_CONCURRENTLY: + # continue onward unhindered + Thread(target=self._deploy_metanode).start() + # or + else: + # asynchronously await completion + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._deploy_metanode) + + # presume connected for now, latency + pid check next iteration will confirm this + status = NetworkStatus.CONNECTED + except asyncio.CancelledError: + self.logger().exception(f"asyncio.CancelledError {__name__}") + except Exception as error: + self.logger().exception(f"check network failed {__name__} {error}") + return status + + def restore_tracking_states(self, saved_states: Dict[str, any]): + """ + Restore in-flight orders from saved tracking states, + this is so the connector result pick up on where it left off + when it disconnects. + :param saved_states: The saved tracking_states. + """ + self.dev_log("restore_tracking_states", level=2) + self._order_tracker.restore_tracking_states(tracking_states=saved_states) + + def tick(self, timestamp: float): + """ + Includes the logic processed every time a new tick happens in the bot. + It enables execution of the status update polling loop using an event. + """ + self.dev_log("tick", level=1) + now = time.time() + poll_interval = ( + self.SHORT_POLL_INTERVAL + if now - self.user_stream_tracker.last_recv_time > 60.0 + else self.LONG_POLL_INTERVAL + ) + last_tick = int(self._last_timestamp / poll_interval) + current_tick = int(timestamp / poll_interval) + + if current_tick > last_tick: + if not self._poll_notifier.is_set(): + self._poll_notifier.set() + self._last_timestamp = timestamp + + def get_order_book(self, trading_pair: str) -> OrderBook: + """ + Returns the current order book for a particular market + :param trading_pair: BASE-QUOTE + """ + self.dev_log("get_order_book", level=2) + if trading_pair not in self._order_book_tracker.order_books: + inverted_pair = "-".join(trading_pair.split("-")[::-1]) + if inverted_pair in self._order_book_tracker.order_books: + trading_pair = inverted_pair + return self._order_book_tracker.order_books[trading_pair] + else: + raise ValueError(f"No order book exists for '{trading_pair}'.") + return self._order_book_tracker.order_books[trading_pair] + + def start_tracking_order( + self, + order_id: str, + exchange_order_id: Optional[str], + trading_pair: str, + trade_type: TradeType, + price: Decimal, + amount: Decimal, + order_type: OrderType, + ): + """ + Starts tracking an order by adding it to the order tracker. + :param order_id: the order identifier + :param exchange_order_id: the identifier for the order in the DEX + :param trading_pair: BASE-QUOTE + :param trade_type: the type of order (buy or sell) + :param price: the price for the order + :param amount: the amount for the order + :order type: type of execution for the order (MARKET, LIMIT, LIMIT_MAKER) + """ + self.dev_log("start_tracking_order", level=2) + self._order_tracker.start_tracking_order( + InFlightOrder( + client_order_id=order_id, + exchange_order_id=exchange_order_id, + trading_pair=trading_pair, + order_type=order_type, + trade_type=trade_type, + amount=amount, + creation_timestamp=int(time.time() * 1e3), + price=price, + ) + ) + + def stop_tracking_order(self, order_id: str): + """ + Stops tracking an order + :param order_id: The id of the order that will not be tracked any more + """ + self.dev_log("stop_tracking_order", level=2) + self._order_tracker.stop_tracking_order(client_order_id=order_id) + + def get_order_price_quantum(self, trading_pair: str, *_) -> Decimal: + """ + Used by quantize_order_price() in _limit_order_create() + Returns a price step, a minimum price increment for a given trading pair. + :param trading_pair: the trading pair to check for market conditions + :param price: the starting point price + """ + self.dev_log("get_order_price_quantum", level=2) + trading_rule = self._trading_rules[trading_pair] + return trading_rule.min_price_increment + + def get_order_size_quantum(self, trading_pair: str, *_) -> Decimal: + """ + Used by quantize_order_price() in _limit_order_create() + Returns an order amount step, a minimum amount increment for a given pair. + :param trading_pair: the trading pair to check for market conditions + :param order_size: the starting point order price + """ + self.dev_log("get_order_size_quantum", level=2) + trading_rule = self._trading_rules[trading_pair] + return trading_rule.min_base_amount_increment + + def quantize_order_amount( + self, + trading_pair: str, + amount: Decimal, + side: str, + price: Decimal = Decimal(0), + ) -> Decimal: + """ + Applies the trading rules to calculate the correct order amount for the market + :param trading_pair: the token pair for which the order will be created + :param amount: the intended amount for the order + :param price: the intended price for the order + :return: the quantized order amount after applying the trading rules + """ + self.dev_log("quantize_order_amount", level=2) + trading_rule = self._trading_rules[trading_pair] + quantized_amount: Decimal = self.quantize_order_amount_by_side(trading_rule, amount, side) + + # Check against min_order_size and min_notional_size. + # If not passing either check, return 0. + min_size = trading_rule.min_order_value + + self.dev_log(f"QUANTIZE_ORDER_AMOUNT: quantized amount {quantized_amount} minimum size {min_size}") + + if quantized_amount < min_size: + return Decimal(0) + + if price == Decimal(0): + current_price: Decimal = self.get_price(trading_pair, False) + notional_size = current_price * quantized_amount + else: + notional_size = price * quantized_amount + + min_notional_size = trading_rule.min_order_size + + self.dev_log(f"QUANTIZE_ORDER_AMOUNT: notional size {notional_size} minimum size plus 1% {min_notional_size * Decimal('1.01')}") + # Add 1% as a safety factor in case the prices changed while making the order. + if notional_size < min_notional_size * Decimal("1.01"): + return Decimal(0) + + return quantized_amount + + def get_fee( + self, + base_currency: str, + quote_currency: str, + _, # order_type: OrderType, + order_side: TradeType, # TradeType.BUY TradeType.SELL + __, # amount: Decimal, + ___, # price: Decimal = Decimal("nan"), + is_maker: Optional[bool] = None, + ) -> TradeFeeBase: + """ + Calculates the estimated fee an order would pay + Graphene fees include a added flat transaction fee paid in core token; 1.3.0 + AND a deducted percent based market fees paid in currency RECEIVED + market fees MAY have maker/taker functionality + """ + class GrapheneTradeFee(TradeFeeBase): + """ + a trade fee class which includes both Added and Deducted fees + """ + + def get_fee_impact_on_order_cost(_): + """ + Added Fees + """ + return AddedToCostTradeFee.get_fee_impact_on_order_cost + + def get_fee_impact_on_order_returns(_): + """ + Deducted Fees + """ + return DeductedFromReturnsTradeFee.get_fee_impact_on_order_returns + + def type_descriptor_for_json(_): + ... + + self.dev_log("get_fee", level=2) + account = dict(self.metanode.account) # DISCRETE SQL QUERY + objects = dict(self.metanode.objects) # DISCRETE SQL QUERY + assets = dict(self.metanode.assets) # DISCRETE SQL QUERY + tx_currency = objects["1.3.0"]["name"] + tx_amount = account["fees_account"]["create"] + # you pay trade fee on the currency you receive in the transaction + trade_currency = quote_currency + maker_pct = assets[quote_currency]["fees_asset"]["fees"]["maker"] + taker_pct = assets[quote_currency]["fees_asset"]["fees"]["taker"] + if order_side == TradeType.BUY: + trade_currency = base_currency + maker_pct = assets[base_currency]["fees_asset"]["fees"]["maker"] + taker_pct = assets[base_currency]["fees_asset"]["fees"]["taker"] + trade_pct = maker_pct if is_maker else taker_pct + # build a TradeFeeBase class object + flat_fee = TokenAmount(token=tx_currency, amount=Decimal(tx_amount)) + fee = GrapheneTradeFee( + flat_fees=[flat_fee], + percent=Decimal(trade_pct), + # handle TradeFeeBase warning; do not specify token if its quote token + percent_token=trade_currency if trade_currency != quote_currency else None, + ) + # ############################################################################## + # FIXME the hummingbot binance reference is a path to deprecation warning + # ############################################################################## + # there appears to be no functional reference material, see: + # ~ + # ~ BitshareExchange + # ~ ExchangeBase + # ~ ConnectorBase + # ~ estimate_fee_pct + # ~ core.utils.estimate_fee.estimate_fee < binance ends here not implemented + # ############################################################################## + # FIXME just return ZERO like this? peer review please + # return DeductedFromReturnsTradeFee(percent=self.estimate_fee_pct(False)) + # ############################################################################## + return fee + + def buy( + self, + trading_pair: str, + amount: Decimal, + order_type: OrderType = OrderType.LIMIT, + price: Decimal = Decimal("nan"), + **__, + ) -> str: + """ + Creates a promise to create a buy order using the parameters. + :param trading_pair: the token pair to operate with + :param amount: the order amount + :param order_type: all graphene orders are LIMIT type + :param price: the order price + :return: the id assigned by the connector to the order (the client id) + """ + self.dev_log("buy", level=2) + order_id = graphene_utils.get_new_client_order_id( + is_buy=True, trading_pair=trading_pair + ) + safe_ensure_future( + self._limit_order_create( + TradeType.BUY, order_id, trading_pair, amount, order_type, price + ) + ) + return order_id + + def sell( + self, + trading_pair: str, + amount: Decimal, + order_type: OrderType = OrderType.LIMIT, + price: Decimal = Decimal("nan"), + **__, + ) -> str: + """ + Creates a promise to create a sell order using the parameters. + :param trading_pair: the token pair to operate with + :param amount: the order amount + :param order_type: all graphene orders are LIMIT type + :param price: the order price + :return: the id assigned by the connector to the order (the client id) + """ + self.dev_log("sell", level=2) + order_id = graphene_utils.get_new_client_order_id( + is_buy=False, trading_pair=trading_pair + ) + safe_ensure_future( + self._limit_order_create( + TradeType.SELL, order_id, trading_pair, amount, order_type, price + ) + ) + return order_id + + def cancel(self, trading_pair: str, order_id: str): + """ + Creates a promise to cancel an order in the DEX + :param trading_pair: the trading pair the order to cancel operates with + :param order_id: the client id of the order to cancel + :return: the client id of the order to cancel + """ + self.dev_log("cancel", level=2) + safe_ensure_future( + self._limit_order_cancel( + trading_pair=trading_pair, + client_order_id=order_id, + ) + ) + return order_id + + async def cancel_all(self, _) -> List[CancellationResult]: + """ + Cancels all currently active orders. + The cancellations are batched at the core level into groups of 20 per tx + Used by bot's top level stop and exit commands + (cancelling outstanding orders on exit) + :param timeout_seconds: the maximum time in seconds the cancel logic should run + :return: a list of CancellationResult instances, one for each of the order + """ + self.dev_log("cancel_all", level=2) + # get an order id set of known open orders hummingbot is tracking + # change each OrderState to PENDING_CANCEL + await asyncio.sleep(0.01) + + hummingbot_open_client_ids = { + o.client_order_id for o in self.in_flight_orders.values() if not o.is_done + } + + rpc = RemoteProcedureCall(self.constants) + open_client_ids = [j for j in [self._order_tracker.swap_id(exchange_order_id=i) for i in rpc.open_order_ids()] if j is not None] + + await asyncio.sleep(0.01) + + # disregard unnecessary open orders; blockchain is gospel, not hummingbot + for order_id in hummingbot_open_client_ids: + if order_id not in open_client_ids: + self.stop_tracking_order(order_id) + + msg = f"open_client_ids {len(open_client_ids)} {open_client_ids}" + self.dev_log(msg) + if not open_client_ids: + return [] + open_exchange_ids = { + self._order_tracker.swap_id(i) + for i in open_client_ids + if self._order_tracker.swap_id(i) is not None + } + open_ids = { + self._order_tracker.swap_id(i): i + for i in open_client_ids + if self._order_tracker.swap_id(i) is not None + } + await asyncio.sleep(0.01) + # log open orders in client and DEX terms + msg = f"open_exchange_ids {len(open_exchange_ids)} {open_exchange_ids}" + self.dev_log(msg) + await asyncio.sleep(0.01) + for order_id in open_client_ids: + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + trading_pair=self.in_flight_orders[order_id].trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.PENDING_CANCEL, + ) + self._order_tracker.process_order_update(order_update) + await asyncio.sleep(0.01) + + cancelled_exchange_ids = [] + for pair in self._trading_pairs: + # build a cancel all operation using the broker(order) method + order = json.loads(self._auth.prototype_order(pair)) + # order["edicts"] = [{"op": "cancel", "ids": list(open_exchange_ids)}] + order["edicts"] = [{"op": "cancel", "ids": ["1.7.X"]}] + + await asyncio.sleep(0.01) + # cancel all and get a cancellation result list of DEX order ids + self.dev_log(order["edicts"]) + cancelled_exchange_ids.extend((await self._broker(order))["result"]) + + msg = ( + f"cancelled_exchange_ids {len(cancelled_exchange_ids)}" + + f" {cancelled_exchange_ids}" + ) + self.dev_log(msg) + # swap the list to hummingbot client ids + cancelled_client_ids = [open_ids[i] for i in cancelled_exchange_ids if i in open_ids] + await asyncio.sleep(0.01) + # log cancelled orders in client and DEX terms + msg = f"cancelled_client_ids {len(cancelled_client_ids)} {cancelled_client_ids}" + self.dev_log(msg) + + await asyncio.sleep(0.01) + # create a list of successful CancellationResult + # change each OrderState to CANCELED + successful_cancellations = [] + for order_id in cancelled_client_ids: + successful_cancellations.append(CancellationResult(order_id, True)) + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + trading_pair=self.in_flight_orders[order_id].trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.CANCELED, + ) + self._order_tracker.process_order_update(order_update) + self.stop_tracking_order(order_id) + msg = ( + f"successful_cancellations {len(successful_cancellations)}" + + f" {successful_cancellations}" + ) + self.dev_log(msg) + + # create a list of apparently failed CancellationResult + # change each OrderState back to OPEN + await asyncio.sleep(0.01) + failed_cancellations = [] + for order_id in open_client_ids: # client order ids + if order_id not in cancelled_client_ids: + failed_cancellations.append(CancellationResult(order_id, False)) + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + trading_pair=self.in_flight_orders[order_id].trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.OPEN, + ) + self._order_tracker.process_order_update(order_update) + await asyncio.sleep(0.01) + # log successful and failed cancellations + + msg = ( + f"failed_cancellations {len(failed_cancellations)}" + + f" {failed_cancellations}" + ) + self.dev_log(msg) + await asyncio.sleep(0.01) + # join the lists and return + return successful_cancellations + failed_cancellations + + async def _broker(self, order): + self.dev_log("self._broker", level=1) + ret = {} + borker = Thread( + target=self._auth.broker, + args=( + order, + ret, + ), + ) + borker.start() + self.dev_log(ret, level=1) + while not ret: + await asyncio.sleep(1) + self.dev_log("Waiting for manualSIGNING", level=1) + self.dev_log(ret, level=1) + return ret + + def quantize_order_amount_by_side(self, trading_rule, amount, side): + """ + Applies trading rule to quantize order amount by side. + """ + order_size_quantum = ( + trading_rule.min_base_amount_increment if side == "buy" + else trading_rule.min_quote_amount_increment + ) + return (amount // order_size_quantum) * order_size_quantum + + async def _limit_order_create( + self, + trade_type: TradeType, + order_id: str, + trading_pair: str, + amount: Decimal, + order_type: OrderType, + price: Optional[Decimal] = Decimal("NaN"), + ): + """ + Creates a an order in the DEX using the parameters to configure it + :param trade_type: the side of the order (BUY of SELL) + :param order_id: the id that should be assigned to the order (the client id) + :param trading_pair: the token pair to operate with + :param amount: the order amount + :param order_type: the type of order to create (MARKET, LIMIT, LIMIT_MAKER) + :param price: the order price + """ + self.dev_log("_limit_order_create", level=2) + self.dev_log("############### LIMIT ORDER CREATE ATTEMPT ###############") + self.dev_log(trade_type) + self.dev_log(order_type) + self.dev_log(order_id) + self.dev_log(trading_pair) + self.dev_log(amount) + self.dev_log(price) + if self._wif == "": + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + exchange_order_id=order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.OPEN, + ) + self._order_tracker.process_order_update(order_update) + return + # get trading rules and normalize price and amount + trading_rule: TradingRule = self._trading_rules[trading_pair] + price = self.quantize_order_price(trading_pair, price) + quantize_amount_price = Decimal("0") if price.is_nan() else price + amount = self.quantize_order_amount( + trading_pair=trading_pair, + amount=amount, + side="buy" if trade_type == TradeType.BUY else "sell", + price=quantize_amount_price, + ) + # create an inflight order keyed by client order_id + self.start_tracking_order( + order_id=order_id, + exchange_order_id=None, + trading_pair=trading_pair, + trade_type=trade_type, + price=price, + amount=amount, + order_type=order_type, + ) + # if the amount is too little disregard the order + # update tracking status to FAILED + if amount < trading_rule.min_order_value: + msg = ( + f"{trade_type.name.title()} order amount {amount} is lower than the" + f" minimum order size {trading_rule.min_order_value}. The order will not" + " be created." + ) + self.logger().warning(msg) + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.FAILED, + ) + self._order_tracker.process_order_update(order_update) + return + # format an order, broadcast to the blockchain + # update tracking status to OPEN + try: + order = json.loads(self._auth.prototype_order(trading_pair)) + self.dev_log(trade_type) + order["edicts"] = [ + { + "op": "buy" if trade_type == TradeType.BUY else "sell", + "amount": float(amount), + "price": float(price), + "expiration": 0, + }, + ] + self.dev_log(order["edicts"]) + await asyncio.sleep(0.01) + result = await self._broker(order) + # ~ {"method": "notice", + # ~ "params": [1, [{ + # ~ "id": "9c91cd07aa2844473cc3c6047ec2c4f7ce40c8c1", + # ~ "block_num": 66499124, + # ~ "trx_num": 0, + # ~ "trx": { + # ~ "ref_block_num": 45619, + # ~ "ref_block_prefix": 3851304488, + # ~ "expiration": "2022-02-20T20:12:06", + # ~ "operations": [ + # ~ [1, { + # ~ "fee": { + # ~ "amount": 48260, + # ~ "asset_id": "1.3.0" + # ~ }, + # ~ "seller": "1.2.743179", + # ~ "amount_to_sell": { + # ~ "amount": 5, + # ~ "asset_id": "1.3.5640" + # ~ }, + # ~ "min_to_receive": { + # ~ "amount": 1000000, + # ~ "asset_id": "1.3.0" + # ~ }, + # ~ "expiration": "2096-10-02T07:06:40", + # ~ "fill_or_kill": false, + # ~ "extensions": []}], + # ~ [1, { + # ~ "fee": { + # ~ "amount": 48260, + # ~ "asset_id": "1.3.0" + # ~ }, + # ~ "seller": "1.2.743179", + # ~ "amount_to_sell": { + # ~ "amount": 1000000, + # ~ "asset_id": "1.3.0" + # ~ }, + # ~ "min_to_receive": { + # ~ "amount": 5, + # ~ "asset_id": "1.3.5640" + # ~ }, + # ~ "expiration": "2096-10-02T07:06:40", + # ~ "fill_or_kill": false, + # ~ "extensions": []}]], + # ~ "extensions": [], + # ~ "signatures": [ + # ~ "1f1fa0acde...d80f8254c6" + # ~ ], + # ~ "operation_results": [ + # ~ [1, {"1.7.490017546"], + # ~ [1, {"1.7.490017547" ]]}}]]} + ############################################################################ + if isinstance(result, dict) and result["status"]: + exchange_order_id = result["result"]["params"][1][0]["trx"][ + "operation_results" + ][0][1] + ######################################################################## + # update_timestamp = int(result["blocknum"]) + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + exchange_order_id=exchange_order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.OPEN, + ) + self._order_tracker.process_order_update(order_update) + else: + raise ValueError("DEX did not return an order id") + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + # if anything goes wrong log stack trace + # update tracking status to FAILED + except Exception as error: + self.logger().network( + "Error submitting order to Graphene for " + f"{amount} {trading_pair} " + f"{price}.", + exc_info=True, + app_warning_msg=str(error), + ) + order_update: OrderUpdate = OrderUpdate( + client_order_id=order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.FAILED, + ) + self._order_tracker.process_order_update(order_update) + + async def _limit_order_cancel( + self, + trading_pair: str, + client_order_id: str, + ) -> list: # of exchange_order_id + """ + Requests the DEX to cancel an active order + :param trading_pair: the trading pair the order to cancel operates with + :param client_order_id: the client id of the order to cancel + """ + self.dev_log(f"CANCELLING ORDER #{client_order_id}") + if self._wif == "": + order_update: OrderUpdate = OrderUpdate( + client_order_id=client_order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.CANCELED, + ) + self._order_tracker.process_order_update(order_update) + self.dev_log("############# PAPER #############") + self.dev_log("ORDER STATUS UPDATED TO CANCELLED") + self.dev_log("#################################") + return [client_order_id] + self.dev_log("_limit_order_cancel", level=2) + result = None + tracked_order = self._order_tracker.fetch_tracked_order(client_order_id) + # if this order was placed by hummingbot + if tracked_order is not None: + # change its status to pending cancellation + order_update: OrderUpdate = OrderUpdate( + client_order_id=client_order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.PENDING_CANCEL, + ) + self._order_tracker.process_order_update(order_update) + # attempt to cancel the order + try: + order = json.loads(self._auth.prototype_order(trading_pair)) + order["header"]["wif"] = self._wif + order["edicts"] = [ + {"op": "cancel", "ids": [tracked_order.exchange_order_id]} + ] + result = await self._broker(order) + self.dev_log(f"CANCELLED ORDER #{client_order_id}") + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + msg = ( + "There was a an error when requesting cancellation of order " + f"{client_order_id}" + ) + self.logger().exception(msg) + raise + ################################################################################ + # if the result from the cancellation attempt contains the DEX order id + # update the status to CANCELLED + self.dev_log(result["result"]) + if ( + isinstance(result["result"], list) + and result["result"] + and result["result"][0] == tracked_order.exchange_order_id + ): + order_update: OrderUpdate = OrderUpdate( + client_order_id=client_order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.CANCELED, + ) + self._order_tracker.process_order_update(order_update) + self.dev_log("ORDER STATUS UPDATED TO CANCELLED") + # otherwise return the order state to open + else: + order_update: OrderUpdate = OrderUpdate( + client_order_id=client_order_id, + trading_pair=trading_pair, + update_timestamp=int(time.time() * 1e3), + new_state=OrderState.OPEN, + ) + self.dev_log("ORDER STATUS RETURNED TO OPEN") + self._order_tracker.process_order_update(order_update) + # return the list of cancellation results + return result["result"] + ################################################################################ + + async def _status_polling_loop(self): + """ + Performs all required operations to keep the connector synchronized + with the DEX. It also updates the time synchronizer. + Executes when the _poll_notifier event is enabled by the `tick` function. + """ + while True: + try: + self.dev_log("###########STATUS#POLLING#LOOP#OCCOURING##########", level=1) + while not self._poll_notifier.is_set(): + await asyncio.sleep(1) + self.dev_log("LOOP IS " + str(self._poll_notifier), level=2) + # ~ await self._poll_notifier.wait() + self.dev_log("###################NOTIFIER#######################", level=1) + await self._update_time_synchronizer() + self.dev_log("###################TIME###########################", level=1) + await self._update_balances() + self.dev_log("###################BALANCES:######################", level=1) + self.dev_log(self._account_balances) + self._last_poll_timestamp = self.current_timestamp + self.dev_log("###################TIMESTAMP######################", level=1) + await asyncio.sleep(1) + self.dev_log("###################END#LOOP#######################", level=1) + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + self.logger().network( + "Unexpected error while fetching updates.", + exc_info=True, + app_warning_msg=( + "Could not fetch account updates. " + "Check metanode and network connection." + ), + ) + except asyncio.CancelledError: + break + finally: + self._poll_notifier = asyncio.Event() + + async def _trading_rules_polling_loop(self): + """ + Performs all required operations to keep the connector synchronized + with the DEX. It also updates the time synchronizer. + Executes when the _poll_notifier event is enabled by the `tick` function. + """ + self.dev_log("_trading_rules_polling_loop", level=2) + + while True: + try: + await asyncio.sleep(1) + await self._update_trading_rules() + except asyncio.CancelledError: + break + except Exception: + self.logger().network( + "Unexpected error while fetching updates.", + exc_info=True, + app_warning_msg=( + "Could not fetch account updates. " + "Check metanode and network connection." + ), + ) + + async def _update_trading_rules(self): + """ + gather DEX info from metanode.assets and pass on to _trading_rules + """ + self.dev_log("_update_trading_rules", level=2) + try: + graphene_max = self.constants.core.GRAPHENE_MAX + metanode_assets = self.metanode.assets + rules = [] + for trading_pair in self.constants.chain.PAIRS: + base, quote = trading_pair.split("-") + base_min = self.constants.core.DECIMAL_SATOSHI + quote_min = self.constants.core.DECIMAL_SATOSHI + supply = self.constants.core.DECIMAL_SATOSHI + try: + base_min = Decimal(1) / 10 ** metanode_assets[base]["precision"] + quote_min = Decimal(1) / 10 ** metanode_assets[quote]["precision"] + supply = Decimal(metanode_assets[base]["supply"]) + except Exception: + pass + rules.append( + TradingRule( + trading_pair=trading_pair, + min_order_size=quote_min, + max_order_size=supply, + min_price_increment=Decimal(1) / int(graphene_max), + min_base_amount_increment=base_min, + min_quote_amount_increment=quote_min, + min_notional_size=base_min, + min_order_value=base_min, + max_price_significant_digits=Decimal(graphene_max), + supports_limit_orders=True, + supports_market_orders=False, # OrderType.LIMIT *only* + buy_order_collateral_token=None, + sell_order_collateral_token=None, + ) + ) + self._trading_rules.clear() + for trading_rule in rules: + self._trading_rules[trading_rule.trading_pair] = trading_rule + except Exception as error: + msg = f"Error updating trading rules: {error.args}" + self.logger().exception(msg) + + async def _user_stream_event_listener(self): + """ + This functions runs in background continuously processing the events + received from the DEX by the user stream data source. + It keeps reading events from the queue until the task is interrupted. + The events received are order updates and trade events. + """ + self.dev_log("_user_stream_event_listener", level=2) + + async def iter_user_event_queue() -> AsyncIterable[Dict[str, any]]: + """ + fetch events from the user stream + """ + while True: + try: + user_streamer = await self._user_stream_tracker.user_stream.get() + self.dev_log("########################") + self.dev_log(user_streamer) + self.dev_log("########################") + yield user_streamer + except asyncio.CancelledError: + break + except Exception: + self.logger().network( + "Unknown error. Retrying after 1 seconds.", + exc_info=True, + app_warning_msg=( + "Could not fetch user events from Graphene." + "Check network connection." + ), + ) + await asyncio.sleep(1.0) + finally: + await asyncio.sleep(0.1) + + async for event_message in iter_user_event_queue(): + try: + # localize and type cast values common to all event_messages + trading_pair = str(event_message["trading_pair"]) + execution_type = str(event_message["execution_type"]) + client_order_id = str(event_message["client_order_id"]) + exchange_order_id = str(event_message["exchange_order_id"]) + # process trade event messages + if execution_type == "FILL": + tracked_order = self._order_tracker.fetch_order( + client_order_id=client_order_id + ) + if tracked_order is not None: + # localize and type cast fill order event message values + trade_id = str(event_message["trade_id"]) + fee_asset = str(event_message["fee_asset"]) + fee_paid = Decimal(event_message["fee_paid"]) + fill_price = Decimal(event_message["price"]) + fill_timestamp = int(event_message["fill_timestamp"]) + fill_base_amount = Decimal(event_message["fill_base_amount"]) + # estimate the quote amount + fill_quote_amount = fill_base_amount * fill_price + # process a trade update + trade_update = TradeUpdate( + client_order_id=client_order_id, + exchange_order_id=exchange_order_id, + trading_pair=trading_pair, + fill_base_amount=fill_base_amount, + fill_quote_amount=fill_quote_amount, + fill_price=fill_price, + trade_id=trade_id, + fee_asset=fee_asset, + fee_paid=fee_paid, + fill_timestamp=fill_timestamp, + ) + self._order_tracker.process_trade_update(trade_update) + # all other event messages just change order state + # eg "CANCELLED" or "FILLED" + in_flight_order = self.in_flight_orders.get(client_order_id) + if in_flight_order is not None: + # localize order state event message values + update_timestamp = (int(event_message["update_timestamp"]),) + new_state = ( + self.constants.ORDER_STATE[event_message["order_state"]], + ) + # process an order update + order_update = OrderUpdate( + trading_pair=trading_pair, + client_order_id=client_order_id, + exchange_order_id=exchange_order_id, + update_timestamp=update_timestamp, + new_state=new_state, + ) + self._order_tracker.process_order_update(order_update=order_update) + await self._update_balances() + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + self.logger().error( + "Unexpected error in user stream listener loop.", exc_info=True + ) + await asyncio.sleep(1.0) + + async def _update_balances(self): + """ + use metanode.assets 'total' and 'free' to update + self._account_balances + self._account_available_balances + """ + self.dev_log("Updating Balances", level=2) + error_msg = None + if self._account_balances == {}: + self._auth.login() + try: + if await self.check_network() == NetworkStatus.NOT_CONNECTED: + for asset in self.constants.chain.ASSETS: + self._account_available_balances[asset] = Decimal(0) + self._account_balances[asset] = Decimal(0) + error_msg = "Error updating account balances: Metanode not connected. Bad key?" + self.logger().exception(error_msg) + else: + metanode_assets = self.metanode.assets + for asset in self.constants.chain.ASSETS: + self._account_available_balances[asset] = Decimal( + str(metanode_assets[asset]["balance"]["free"]) + ) + self._account_balances[asset] = Decimal( + str(metanode_assets[asset]["balance"]["total"]) + ) + except Exception as error: + for asset in self.constants.chain.ASSETS: + self._account_available_balances[asset] = Decimal(0) + self._account_balances[asset] = Decimal(0) + error_msg = f"Error updating account balances: {error.args}" + self.logger().exception(error_msg) + msgs = [ + "Available Balances", + self._account_available_balances, + "Total Balances", + self._account_balances, + ] + for msg in msgs: + self.dev_log(msg, level=1) + if error_msg is not None: + raise RuntimeError(error_msg) + + async def _update_time_synchronizer(self): + """ + Used to synchronize the local time with the server's time. + This class is useful when timestamp-based signatures + are required by the DEX for authentication. + Upon receiving a timestamped message from the server, + use `update_server_time_offset_with_time_provider` + to synchronize local time with the server's time. + """ + self.dev_log("_update_time_synchronizer", level=2) + if self.constants.hummingbot.SYNCHRONIZE: + synchro = self._time_synchronizer + try: + await synchro.update_server_time_offset_with_time_provider( + time_provider=self.metanode.timing["blocktime"] + ) + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + except Exception: + self.logger().exception("Error requesting server time") + raise diff --git a/hummingbot/connector/exchange/graphene/graphene_order_book.py b/hummingbot/connector/exchange/graphene/graphene_order_book.py new file mode 100644 index 0000000000..9685b9669b --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_order_book.py @@ -0,0 +1,94 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation, no-member, no-name-in-module +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_order_book v1.0.0 fork +~ +""" +# STANDARD MODULES +from typing import Dict, Optional + +# HUMMINGBOT MODULES +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import OrderBookMessage, OrderBookMessageType +from hummingbot.core.event.events import TradeType + + +class GrapheneOrderBook(OrderBook): + """ + standardizes formatting for trade history and orderbook snapshots + """ + + @classmethod + def snapshot_message_from_exchange( + cls, + msg: Dict[str, any], + timestamp: float, + metadata: Optional[Dict] = None, + ) -> OrderBookMessage: + """ + Creates a snapshot message with the order book snapshot message + :param msg: raw response from the exchange when requesting the order book + :param timestamp: the snapshot timestamp + :param metadata: a dictionary with extra information to add to the snapshot data + :return: a formatted OrderBookMessage SNAPSHOT + """ + if metadata: + msg.update(metadata) + return OrderBookMessage( + OrderBookMessageType.SNAPSHOT, + { + "trading_pair": msg["trading_pair"], + "update_id": msg["blocktime"], + "bids": msg["bids"], + "asks": msg["asks"], + }, + timestamp=timestamp, + ) + + @classmethod + def diff_message_from_exchange( + cls, + msg: Dict[str, any], + timestamp: Optional[float] = None, + metadata: Optional[Dict] = None, + ) -> OrderBookMessage: + """ + GrapheneOrderBook deals only in snapshot messages + """ + raise NotImplementedError(__doc__) + + @classmethod + def trade_message_from_exchange( + cls, + msg: Dict[str, any], + metadata: Optional[Dict] = None, + ): + """ + Creates a trade message with the trade event sent by the exchange + :param msg: the trade event details sent by the exchange + :param metadata: a dictionary with extra information to add to trade message + :return: a formatted OrderBookMessage TRADE + """ + if metadata: + msg.update(metadata) + return OrderBookMessage( + OrderBookMessageType.TRADE, + { + "trading_pair": msg["trading_pair"], + "trade_type": float(TradeType.SELL.value) + if msg["trade_type"] == "SELL" + else float(TradeType.BUY.value), + "trade_id": msg["trade_id"], + "update_id": msg["update_id"], + "price": msg["price"], + "amount": msg["amount"], + }, + timestamp=msg["update_id"] * 1e-3, + ) diff --git a/hummingbot/connector/exchange/graphene/graphene_order_book_tracker.py b/hummingbot/connector/exchange/graphene/graphene_order_book_tracker.py new file mode 100644 index 0000000000..eac35da277 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_order_book_tracker.py @@ -0,0 +1,217 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation, no-member, no-name-in-module, broad-except +# pylint: disable=too-many-instance-attributes +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_order_book_tracker v1.0.0 fork +~ +""" +# STANDARD MODULES +import asyncio +import logging +import time +from collections import defaultdict, deque +from typing import Deque, Dict, List, Optional + +# HUMMINGBOT MODULES +from hummingbot.connector.exchange.graphene.graphene_api_order_book_data_source import GrapheneAPIOrderBookDataSource +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.core.data_type.order_book import OrderBook +from hummingbot.core.data_type.order_book_message import OrderBookMessage +from hummingbot.core.data_type.order_book_tracker import OrderBookTracker +from hummingbot.core.utils.async_utils import safe_ensure_future +from hummingbot.logger import HummingbotLogger + +# GLOBAL CONSTANTS +CONSTANTS = GrapheneConstants() +DEV = False + + +class GrapheneOrderBookTracker(OrderBookTracker): + """ + continually update the bids and asks for each trading pair + """ + + _logger: Optional[HummingbotLogger] = None + + def __init__( + self, + domain: str, + trading_pairs: Optional[List[str]] = None, + **__, + ): + # ~ print("GrapheneOrderBookTracker") + self.domain = domain + self.constants = GrapheneConstants(domain) + + super().__init__( + data_source=GrapheneAPIOrderBookDataSource( + trading_pairs=self.constants.chain.PAIRS, + domain=domain, + ), + trading_pairs=self.constants.chain.PAIRS, + domain=domain, + ) + self._order_book_diff_stream: asyncio.Queue = asyncio.Queue() + self._order_book_snapshot_stream: asyncio.Queue = asyncio.Queue() + self._ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop() + self.domain = domain + self._saved_message_queues: Dict[str, Deque[OrderBookMessage]] = defaultdict( + lambda: deque(maxlen=1000) + ) + self._trading_pairs = self.constants.chain.PAIRS + self._order_book_stream_listener_task: Optional[asyncio.Task] = None + + @classmethod + def logger(cls) -> HummingbotLogger: + """ + a classmethod for logging + """ + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + def dev_log(self, *args, **kwargs): + """ + log only in dev mode + """ + if DEV: + self.logger().info(*args, **kwargs) + + @property + def exchange_name(self) -> str: + """ + the name of this graphene blockchain + """ + return self.constants.chain.NAME + + def start(self): + """ + Starts the background task that connects to the exchange + and listens to order book updates and trade events. + """ + super().start() + self._order_book_stream_listener_task = safe_ensure_future( + self._data_source.listen_for_subscriptions() + ) + + def stop(self): + """ + Stops the background task + """ + _ = ( + self._order_book_stream_listener_task + and self._order_book_stream_listener_task.cancel() + ) + super().stop() + + async def _order_book_diff_router(self): + """ + Routes the real-time order book diff messages to the correct order book. + """ + last_message_timestamp: float = time.time() + messages_accepted: int = 0 + messages_rejected: int = 0 + messages_queued: int = 0 + while True: + try: + ob_message: OrderBookMessage = await self._order_book_diff_stream.get() + trading_pair: str = ob_message.trading_pair + + if trading_pair not in self._tracking_message_queues: + messages_queued += 1 + # Save diff messages received before snapshots are ready + self._saved_message_queues[trading_pair].append(ob_message) + continue + message_queue: asyncio.Queue = self._tracking_message_queues[ + trading_pair + ] + # Check the order book's initial update ID. If it's larger, don't bother + order_book: OrderBook = self._order_books[trading_pair] + + if order_book.snapshot_uid > ob_message.update_id: + messages_rejected += 1 + continue + await message_queue.put(ob_message) + messages_accepted += 1 + + # Log some statistics. + now: float = time.time() + if int(now / 60.0) > int(last_message_timestamp / 60.0): + msg = ( + f"Diff messages processed: {messages_accepted}, " + f"rejected: {messages_rejected}, queued: {messages_queued}" + ) + self.logger().debug(msg) + messages_accepted = 0 + messages_rejected = 0 + messages_queued = 0 + last_message_timestamp = now + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + self.logger().network( + "Unexpected error routing order book messages.", + exc_info=True, + app_warning_msg=( + "Error routing order book messages. Retrying in 5 seconds." + ), + ) + await asyncio.sleep(5.0) + + async def _track_single_book(self, trading_pair: str): + + past_diffs_window: Deque[OrderBookMessage] = deque() + self._past_diffs_windows[trading_pair] = past_diffs_window + message_queue: asyncio.Queue = self._tracking_message_queues[trading_pair] + order_book: OrderBook = self._order_books[trading_pair] + last_message_timestamp: float = time.time() + diff_messages_accepted: int = 0 + + while True: + try: + saved_messages: Deque[OrderBookMessage] = self._saved_message_queues[ + trading_pair + ] + # Process saved messages first if there are any + if len(saved_messages) > 0: + message = saved_messages.popleft() # OrderBookMessage + diff_messages_accepted += len(saved_messages) + else: + message = await message_queue.get() # OrderBookMessage + past_diffs: List[OrderBookMessage] = list(past_diffs_window) + order_book.restore_from_snapshot_and_diffs(message, past_diffs) + msg = f"Processed order book snapshot for {trading_pair}." + self.logger().debug(msg) + # Output some statistics periodically. + now: float = time.time() + if int(now / 60.0) > int(last_message_timestamp / 60.0): + self.logger().debug( + f"Processed {diff_messages_accepted} order book diffs for" + f" {trading_pair}." + ) + diff_messages_accepted = 0 + last_message_timestamp = now + except asyncio.CancelledError: + msg = f"asyncio.CancelledError {__name__}" + self.logger().exception(msg) + raise + except Exception: + msg = f"Unexpected error tracking order book for {trading_pair}." + self.logger().network( + msg, + exc_info=True, + app_warning_msg=( + "Unexpected error tracking order book. Retrying after 5" + " seconds." + ), + ) + await asyncio.sleep(5.0) diff --git a/hummingbot/connector/exchange/graphene/graphene_user_stream_tracker.py b/hummingbot/connector/exchange/graphene/graphene_user_stream_tracker.py new file mode 100644 index 0000000000..247697b877 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_user_stream_tracker.py @@ -0,0 +1,82 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=bad-continuation, no-member, broad-except +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_user_stream_tracker v1.0.0 +~ +""" +# STANDARD MODULES +import asyncio +import logging +from typing import Optional + +# METANODE MODULES +from metanode.graphene_metanode_client import GrapheneTrustlessClient + +# HUMMINGBOT MODULES +from hummingbot.connector.exchange.graphene.graphene_api_user_stream_data_source import GrapheneAPIUserStreamDataSource +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.core.data_type.user_stream_tracker import UserStreamTracker +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.core.utils.async_utils import safe_ensure_future, safe_gather +from hummingbot.logger import HummingbotLogger + + +class GrapheneUserStreamTracker(UserStreamTracker): + """ + tracks fill orders, open orders, created orders, and cancelled orders + """ + + _logger: Optional[HummingbotLogger] = None + + def __init__(self, domain: str, order_tracker: UserStreamTracker, *_, **__): + # ~ print("GrapheneUserStreamTracker") + super().__init__(GrapheneAPIUserStreamDataSource(domain=domain, order_tracker=order_tracker)) + self._ev_loop: asyncio.events.AbstractEventLoop = asyncio.get_event_loop() + self._data_source: Optional[UserStreamTrackerDataSource] = None + self._user_stream_tracking_task: Optional[asyncio.Task] = None + self._order_tracker = order_tracker + self.domain = domain + self.constants = GrapheneConstants(domain) + self.metanode = GrapheneTrustlessClient(self.constants) + + @classmethod + def logger(cls) -> HummingbotLogger: + """ + a classmethod for logging + """ + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + @property + def data_source(self) -> UserStreamTrackerDataSource: + """ + Returns the instance of the data source that listens to the private user channel + to receive updates from the DEX. If the instance is not initialized it will + be created. + :return: the user stream instance that is listening to user updates + """ + # ~ print("GrapheneUserStreamTracker data_source") + if not self._data_source: + self._data_source = GrapheneAPIUserStreamDataSource( + domain=self.domain, order_tracker=self._order_tracker + ) + return self._data_source + + async def start(self): + """ + Starts the background task that connects to the DEX + and listens to user activity updates + """ + # ~ print("GrapheneUserStreamTracker start") + self._user_stream_tracking_task = safe_ensure_future( + self.data_source.listen_for_user_stream(self._ev_loop, self._user_stream) + ) + await safe_gather(self._user_stream_tracking_task) diff --git a/hummingbot/connector/exchange/graphene/graphene_utils.py b/hummingbot/connector/exchange/graphene/graphene_utils.py new file mode 100644 index 0000000000..fd3dab6b72 --- /dev/null +++ b/hummingbot/connector/exchange/graphene/graphene_utils.py @@ -0,0 +1,204 @@ +# DISABLE SELECT PYLINT TESTS +# pylint: disable=no-member +""" + ╔════════════════════════════════════════════════════╗ + ║ ╔═╗╦═╗╔═╗╔═╗╦ ╦╔═╗╔╗╔╔═╗ ╔╦╗╔═╗╔╦╗╔═╗╔╗╔╔═╗╔╦╗╔═╗ ║ + ║ ║ ╦╠╦╝╠═╣╠═╝╠═╣║╣ ║║║║╣ ║║║║╣ ║ ╠═╣║║║║ ║ ║║║╣ ║ + ║ ╚═╝╩╚═╩ ╩╩ ╩ ╩╚═╝╝╚╝╚═╝ ╩ ╩╚═╝ ╩ ╩ ╩╝╚╝╚═╝═╩╝╚═╝ ║ + ║ DECENTRALIZED EXCHANGE HUMMINGBOT CONNECTOR ║ + ╚════════════════════════════════════════════════════╝ +~ +forked from binance_utils v1.0.0 +~ +""" +# STANDARD MODULES +import os +import socket + +from pydantic import Field, SecretStr + +from hummingbot.client.config.config_data_types import BaseConnectorConfigMap, ClientFieldData + +# HUMMINGBOT MODULES +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.core.utils.tracking_nonce import get_tracking_nonce + + +class PeerplaysConfigMap(BaseConnectorConfigMap): + connector: str = Field(default="peerplays", client_data=None) + peerplays_user: str = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Peerplays username", + is_connect_key=True, + prompt_on_new=True, + ) + ) + peerplays_wif: SecretStr = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Peerplays WIF", + is_secure=True, + is_connect_key=True, + prompt_on_new=True, + ) + ) + peerplays_pairs: str = Field( + default="BTC-PPY,HIVE-PPY,HBD-PPY", + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Peerplays trading pairs in this format", + is_connect_key=True, + prompt_on_new=True, + ) + ) + + class Config: + title = "peerplays" + + +class PeerplaysTestnetConfigMap(BaseConnectorConfigMap): + connector: str = Field(default="peerplays_testnet", client_data=None) + peerplays_testnet_user: str = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Peerplays Testnet username", + is_connect_key=True, + prompt_on_new=True, + ) + ) + peerplays_testnet_wif: SecretStr = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Peerplays Testnet WIF", + is_secure=True, + is_connect_key=True, + prompt_on_new=True, + ) + ) + peerplays_testnet_pairs: str = Field( + default="TEST-ABC,TEST-DEFG", + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Peerplays Testnet trading pairs in this format", + is_connect_key=True, + prompt_on_new=True, + ) + ) + + class Config: + title = "peerplays_testnet" + + +class BitsharesConfigMap(BaseConnectorConfigMap): + connector: str = Field(default="bitshares", client_data=None) + bitshares_user: str = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Bitshares username", + is_connect_key=True, + prompt_on_new=True, + ) + ) + bitshares_wif: SecretStr = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Bitshares WIF", + is_secure=True, + is_connect_key=True, + prompt_on_new=True, + ) + ) + bitshares_pairs: str = Field( + default="BTS-HONEST.BTC", + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Bitshares trading pairs in this format", + is_connect_key=True, + prompt_on_new=True, + ) + ) + + class Config: + title = "bitshares" + + +class BitsharesTestnetConfigMap(BaseConnectorConfigMap): + connector: str = Field(default="bitshares_testnet", client_data=None) + bitshares_testnet_user: str = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Bitshares Testnet username", + is_connect_key=True, + prompt_on_new=True, + ) + ) + bitshares_testnet_wif: SecretStr = Field( + default=..., + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Bitshares Testnet WIF", + is_secure=True, + is_connect_key=True, + prompt_on_new=True, + ) + ) + bitshares_testnet_pairs: str = Field( + default="TEST-USD,TEST-CNY", + client_data=ClientFieldData( + prompt=lambda cm: "Enter your Bitshares Testnet trading pairs in this format", + is_connect_key=True, + prompt_on_new=True, + ) + ) + + class Config: + title = "bitshares_testnet" + + +# GLOBAL CONSTANTS +MSG = "active authority WIF key or press Enter for demonstration >>> " +CONSTANTS = GrapheneConstants() # NOTE: not blockchain specific here +CENTRALIZED = False +EXAMPLE_PAIR = "PPY-BTC" +DEFAULT_FEES = [0.1, 0.1] +KEYS = PeerplaysConfigMap.construct() +OTHER_DOMAINS = [ + "bitshares", + "peerplays_testnet", + "bitshares_testnet", +] +OTHER_DOMAINS_PARAMETER = { + "bitshares": "bitshares", + "peerplays_testnet": "peerplays_testnet", + "bitshares_testnet": "bitshares_testnet", +} +OTHER_DOMAINS_EXAMPLE_PAIR = { + "bitshares": "BTS-BTC", + "peerplays_testnet": "TEST-ABC", + "bitshares_testnet": "TEST-ABC", +} +OTHER_DOMAINS_DEFAULT_FEES = { + "bitshares": [0.1, 0.1], + "peerplays_testnet": [0.1, 0.1], + "bitshares_testnet": [0.1, 0.1], +} +OTHER_DOMAINS_KEYS = { + "bitshares": BitsharesConfigMap.construct(), + "peerplays_testnet": PeerplaysTestnetConfigMap.construct(), + "bitshares_testnet": BitsharesTestnetConfigMap.construct(), +} + + +def get_new_client_order_id(is_buy: bool, trading_pair: str) -> str: + """ + Creates a client order id for a new order + :param is_buy: True if the order is a buy order, False otherwise + :param trading_pair: the trading pair the order will be operating with + :return: an identifier for the new order to be used in the client + """ + base, quote = trading_pair.upper().split("-") + side = "B" if is_buy else "S" + base_str = f"{base[0]}{base[-1]}" + quote_str = f"{quote[0]}{quote[-1]}" + client_instance_id = hex(abs(hash(f"{socket.gethostname()}{os.getpid()}")))[2:6] + return ( + f"{CONSTANTS.hummingbot.ORDER_PREFIX}-{side}{base_str}{quote_str}" + + f"{client_instance_id}{get_tracking_nonce()}" + ) diff --git a/hummingbot/connector/exchange/graphene/unit_test_delint.py b/hummingbot/connector/exchange/graphene/unit_test_delint.py new file mode 100644 index 0000000000..411ae4c41c --- /dev/null +++ b/hummingbot/connector/exchange/graphene/unit_test_delint.py @@ -0,0 +1,186 @@ +r""" +black_all.py + +WTFPL litepresence.com Jan 2021 + +A simple script that blacks, isorts, and pylints *.py files +""" + +# STANDARD PYTHON MODULES +import os +from time import time + +# these can be safely ignored in most circumstances +DISABLE = ( + # too many? + "too-many-statements", + "too-many-locals", + "too-many-branches", + "too-many-function-args", + "too-many-arguments", + "too-many-nested-blocks", + "too-many-lines", + # improper exception handling + "bare-except", + "broad-except", + # snake_case, etc. + "invalid-name", + # sometimes it just can't find the modules referenced - on this machine + "import-error", + # whitespace authoritarianism + "bad-continuation", + "bad-whitespace", + # class minimums + "too-few-public-methods", + "no-self-use", + # suppression + "suppressed-message", + "locally-disabled", + "useless-suppression", +) + + +def auto_enumerate(name): + """ + swap enumerate() in place of range(len()) + """ + with open(name, "r") as handle: + data = handle.read() + handle.close() + + data = data.split("\n") + total = 0 + final_data = [] + for line in data: + if ", _ in enumerate(" in line and "):" in line: + line = line.replace(" in range(len(", ", _ in enumerate(").replace( + ")):", "):" + ) + total += 1 + final_data.append(line) + final_data = "\n".join(final_data).strip("\n") + "\n" + + with open(name, "w") as handle: + handle.write(final_data) + handle.close() + if total: + print(f"{total} range(len()) instances enumerated in {name}!") + + +def auto_broad_except(name): + """ + convert 'except:' to 'except Exception:' + """ + with open(name, "r") as handle: + data = handle.read() + handle.close() + + data = data.split("\n") + total = 0 + final_data = [] + for line in data: + if "except:" in line: + line = line.replace("except:", "except Exception:") + total += 1 + final_data.append(line) + final_data = "\n".join(final_data).strip("\n") + "\n" + + with open(name, "w") as handle: + handle.write(final_data) + handle.close() + if total: + print(f"{total} bare excepts replaced in {name}") + + +def auto_double_line_break(name): + """ + Remove extra line breaks + """ + + with open(name, "r") as handle: + data = handle.read() + handle.close() + total = 0 + for _ in range(3): + data_split = data.split("\n\n\n") + data = "\n\n".join(data_split) + total += len(data_split) - 1 + with open(name, "w") as handle: + handle.write(data) + handle.close() + if total: + print(f"{total} double line brakes replaced in {name}") + + +def main(): + """ + \033c\nWelcome to lite Black Pylint Lite All! \n + """ + print(main.__doc__) + dispatch = { + 1: "Black Pylint Lite All!", + 2: "Black Pylint All!", + 3: "Pylint Lite All Only", + 4: "Pylint All Only", + 5: "Black All Only", + } + print(" Menu\n") + for key, val in dispatch.items(): + print(" ", key, " : ", val) + choice = input("\n\nInput Number or Press Enter for Choice 1\n\n ") + if choice == "": + choice = 1 + choice = int(choice) + disabled = "" + if choice in [1, 3]: + disabled = "--enable=all --disable=" + for item in DISABLE: + disabled += item + "," + disabled.rstrip(",") + # Get the start time + start = time() + # Clear the screen + print("\033c") + # Get all of the python files in the current folder + pythons = [f for f in os.listdir() if f.endswith(".py") and f != "black_all.py"] + # pythons = [f for f in os.listdir() if f in ONLY] + + pythons = [f for f in pythons if "test" not in f] + + for name in pythons: + auto_double_line_break(name) + # For every file in that list: + if choice in [1, 2, 5]: + for name in pythons: + # Print the script we are blacking. + print("Blacking script:", name) + # Black the script. + os.system(f"black -l 88 --experimental-string-processing {name}") + # Print a divider. + print("-" * 100) + print("Isorting all scripts") + os.system("isort *.py") + for name in pythons: + auto_enumerate(name) + for name in pythons: + auto_broad_except(name) + if choice in [1, 2, 3, 4]: + for name in pythons: + # Print the script we are blacking. + print("Pylinting script:", name) + # Black the script. + os.system(f"pylint {name} {disabled}") + # Print a divider. + print("-" * 100) + # Say we are done. + print("Done.") + # Get the end time: + end = time() + # Find the time it took to black the scripts. + took = end - start + # Print that time. + print(len(pythons), "scripts took %.1f" % took, "seconds.") + + +if __name__ == "__main__": + main() diff --git a/hummingbot/core/data_type/order_book_tracker.py b/hummingbot/core/data_type/order_book_tracker.py index da498f860a..f423910f7e 100644 --- a/hummingbot/core/data_type/order_book_tracker.py +++ b/hummingbot/core/data_type/order_book_tracker.py @@ -159,6 +159,13 @@ async def _update_last_trade_prices_loop(self): args["domain"] = self._domain last_prices = await self._data_source.get_last_traded_prices(**args) for trading_pair, last_price in last_prices.items(): + if trading_pair not in self._order_books: + inverted_pair = "-".join(trading_pair.split("-")[::-1]) + if inverted_pair in self._order_books: + trading_pair = inverted_pair + last_price = 1 / last_price + else: + raise ValueError(f"{__class__.__name__} invalid trading pair") self._order_books[trading_pair].last_trade_price = last_price self._order_books[trading_pair].last_trade_price_rest_updated = time.perf_counter() else: diff --git a/hummingbot/core/rate_oracle/rate_oracle.py b/hummingbot/core/rate_oracle/rate_oracle.py index 545c311395..6d783629d4 100644 --- a/hummingbot/core/rate_oracle/rate_oracle.py +++ b/hummingbot/core/rate_oracle/rate_oracle.py @@ -16,6 +16,7 @@ from hummingbot.core.rate_oracle.sources.cube_rate_source import CubeRateSource from hummingbot.core.rate_oracle.sources.dexalot_rate_source import DexalotRateSource from hummingbot.core.rate_oracle.sources.gate_io_rate_source import GateIoRateSource +from hummingbot.core.rate_oracle.sources.graphene_rate_source import GrapheneRateSource from hummingbot.core.rate_oracle.sources.kucoin_rate_source import KucoinRateSource from hummingbot.core.rate_oracle.sources.rate_source_base import RateSourceBase from hummingbot.core.rate_oracle.utils import find_rate @@ -33,6 +34,10 @@ "coinbase_advanced_trade": CoinbaseAdvancedTradeRateSource, "cube": CubeRateSource, "dexalot": DexalotRateSource, + # lambdas so that when hummingbot later instantiates/runs this, it can run it without core changes + "peerplays": lambda: GrapheneRateSource("peerplays"), + "bitshares": lambda: GrapheneRateSource("bitshares"), + } diff --git a/hummingbot/core/rate_oracle/sources/graphene_rate_source.py b/hummingbot/core/rate_oracle/sources/graphene_rate_source.py new file mode 100644 index 0000000000..7f8b87590b --- /dev/null +++ b/hummingbot/core/rate_oracle/sources/graphene_rate_source.py @@ -0,0 +1,39 @@ +import asyncio +from decimal import Decimal +from typing import Dict, Optional + +from metanode.graphene_metanode_client import GrapheneTrustlessClient + +from hummingbot.connector.exchange.graphene.graphene_constants import GrapheneConstants +from hummingbot.core.rate_oracle.sources.rate_source_base import RateSourceBase + + +class GrapheneRateSource(RateSourceBase): + def __init__(self, domain): + self.domain = domain + super().__init__() + + @property + def name(self) -> str: + return self.domain + + async def get_prices(self, quote_token: Optional[str] = None) -> Dict[str, Decimal]: + constants = GrapheneConstants(self.domain) + metanode = GrapheneTrustlessClient(constants) + metanode_pairs = metanode.pairs # DISCRETE SQL QUERY + await asyncio.sleep(0.01) + results = {} + for pair in constants.chain.ALL_PAIRS: + try: + self.logger().info(metanode_pairs[pair]["last"]) + results[pair] = Decimal(metanode_pairs[pair]["last"]) + except Exception: + msg = ( + "Unexpected error while retrieving rates from Graphene. " + f"Check the log file for more info. Trading Pair {pair}" + ) + self.logger().error( + msg, + exc_info=True, + ) + return results diff --git a/setup.py b/setup.py index ce551eb07b..617a924583 100644 --- a/setup.py +++ b/setup.py @@ -79,6 +79,7 @@ def main(): "hexbytes", "importlib-metadata", "injective-py", + "metanode", "mypy-extensions", "msgpack", "nose", diff --git a/setup/environment.yml b/setup/environment.yml index c21dfe9d3c..81c2c551ae 100644 --- a/setup/environment.yml +++ b/setup/environment.yml @@ -51,6 +51,7 @@ dependencies: - importlib-metadata==0.23 - injective-py==1.6.* - jsonpickle==3.0.1 + - metanode - mypy-extensions==0.4.3 - msgpack - pandas_ta==0.3.14b