-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid dlp validator crashes eg if rpc goes down #23
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,9 @@ | |
from vana.utils.misc import get_block_explorer_url | ||
from vana.utils.web3 import decode_custom_error | ||
|
||
import threading | ||
from utils.circuit_breaker import CircuitBreaker | ||
|
||
logger = native_logging.getLogger("opendata") | ||
|
||
Balance = Union[int, Decimal] | ||
|
@@ -119,6 +122,25 @@ def __init__( | |
self.web3 = Web3(Web3.HTTPProvider(self.config.chain.chain_endpoint)) | ||
self.web3.middleware_onion.inject(geth_poa_middleware, layer=0) | ||
|
||
self.circuit_breaker = CircuitBreaker(failure_threshold=5, reset_timeout=3600) # 1 hour timeout | ||
self.rpc_health_thread = threading.Thread(target=self._check_rpc_health, daemon=True) | ||
self.rpc_health_thread.start() | ||
|
||
def _check_rpc_health(self): | ||
while True: | ||
self.check_rpc_health() | ||
# Check every minute | ||
time.sleep(60) | ||
|
||
def check_rpc_health(self): | ||
try: | ||
block_number = self.get_current_block() | ||
vana.logging.info(f"RPC is healthy. Current block number: {block_number}") | ||
return True | ||
except Exception as e: | ||
vana.logging.warning(f"RPC health check failed: {e}") | ||
return False | ||
|
||
@staticmethod | ||
def config() -> "config": | ||
parser = argparse.ArgumentParser() | ||
|
@@ -241,6 +263,7 @@ def read_contract_fn(self, function: ContractFunction): | |
except Exception as e: | ||
vana.logging.error(f"Failed to read from contract function: {e}") | ||
|
||
@retry(exceptions=(Exception,), tries=10, delay=1, backoff=2, max_delay=60) | ||
def get_current_block(self) -> int: | ||
""" | ||
Returns the current block number on the blockchain. This function provides the latest block | ||
|
@@ -252,13 +275,21 @@ def get_current_block(self) -> int: | |
Knowing the current block number is essential for querying real-time data and performing time-sensitive | ||
operations on the blockchain. It serves as a reference point for network activities and data synchronization. | ||
""" | ||
return self.web3.eth.block_number | ||
return self.circuit_breaker.call(self._get_current_block) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be used at a higher level, e.g. where the server is spawned? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm probably misunderstanding, but does the circuit breaker give us anything additional that the @Retry doesn't? |
||
|
||
def _get_current_block(self) -> int: | ||
try: | ||
return self.web3.eth.block_number | ||
except Exception as e: | ||
vana.logging.error(f"Error fetching current block number: {e}") | ||
raise | ||
|
||
def close(self): | ||
""" | ||
Cleans up resources for this ChainManager instance like active websocket connection and active extensions | ||
""" | ||
pass | ||
if hasattr(self, 'rpc_health_thread') and self.rpc_health_thread.is_alive(): | ||
self.rpc_health_thread.join(timeout=5) | ||
|
||
def get_total_stake_for_coldkey( | ||
self, h160_address: str, block: Optional[int] = None | ||
|
@@ -325,6 +356,7 @@ def determine_chain_endpoint_and_network(network: str): | |
#### Legacy #### | ||
################ | ||
|
||
@retry(exceptions=(Exception,), tries=10, delay=1, backoff=2, max_delay=60) | ||
def get_balance(self, address: str, block: Optional[int] = None) -> Balance: | ||
""" | ||
Retrieves the token balance of a specific address within the Vana network. This function queries | ||
|
@@ -340,21 +372,26 @@ def get_balance(self, address: str, block: Optional[int] = None) -> Balance: | |
This function is important for monitoring account holdings and managing financial transactions | ||
within the Vana ecosystem. It helps in assessing the economic status and capacity of network participants. | ||
""" | ||
return self.circuit_breaker.call(self._get_balance, address, block) | ||
|
||
def _get_balance(self, address: str, block: Optional[int] = None) -> Balance: | ||
vana.logging.info(f"Fetching balance for address {address}") | ||
try: | ||
@retry(delay=2, tries=3, backoff=2, max_delay=4, logger=logger) | ||
def make_web3_call_with_retry(): | ||
vana.logging.info(f"Fetching balance for address {address}") | ||
return self.web3.eth.get_balance(address, block_identifier=block) | ||
|
||
result = make_web3_call_with_retry() | ||
result = self.web3.eth.get_balance(address, block_identifier=block) | ||
except Exception as e: | ||
vana.logging.error(f"Error fetching balance for address {address}: {e}") | ||
return 0 | ||
raise | ||
|
||
return Web3.from_wei(result, "ether") | ||
|
||
def transfer( | ||
@retry(exceptions=(Exception,), tries=10, delay=1, backoff=2, max_delay=60) | ||
def transfer(self, wallet: "vana.Wallet", dest: str, amount: Union[Balance, float], | ||
wait_for_inclusion: bool = True, wait_for_finalization: bool = False, | ||
prompt: bool = False) -> bool: | ||
return self.circuit_breaker.call(self._transfer, wallet, dest, amount, | ||
wait_for_inclusion, wait_for_finalization, prompt) | ||
|
||
def _transfer( | ||
self, | ||
wallet: "vana.Wallet", | ||
dest: str, | ||
|
@@ -422,32 +459,36 @@ def transfer( | |
|
||
# Send the transaction. | ||
logger.info("Sending transaction...") | ||
txn_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction) | ||
|
||
# Wait for transaction inclusion. | ||
if wait_for_inclusion: | ||
try: | ||
receipt = self.web3.eth.wait_for_transaction_receipt(txn_hash, timeout=120) | ||
if receipt.status == 1: | ||
logger.info(f"Transaction included in block {receipt.blockNumber}. Hash: {txn_hash.hex()}") | ||
else: | ||
logger.error("Transaction failed.") | ||
try: | ||
txn_hash = self.web3.eth.send_raw_transaction(signed_txn.rawTransaction) | ||
|
||
# Wait for transaction inclusion. | ||
if wait_for_inclusion: | ||
try: | ||
receipt = self.web3.eth.wait_for_transaction_receipt(txn_hash, timeout=120) | ||
if receipt.status == 1: | ||
logger.info(f"Transaction included in block {receipt.blockNumber}. Hash: {txn_hash.hex()}") | ||
else: | ||
logger.error("Transaction failed.") | ||
return False | ||
except TransactionNotFound: | ||
logger.error("Transaction not found within timeout period.") | ||
return False | ||
except TransactionNotFound: | ||
logger.error("Transaction not found within timeout period.") | ||
return False | ||
|
||
# Wait for transaction finalization. | ||
if wait_for_finalization: | ||
try: | ||
while True: | ||
receipt = self.web3.eth.get_transaction_receipt(txn_hash) | ||
if receipt.blockNumber is not None: | ||
logger.info(f"Transaction finalized in block {receipt.blockNumber}. Hash: {txn_hash.hex()}") | ||
break | ||
time.sleep(2) | ||
except TransactionNotFound: | ||
logger.error("Transaction not found within timeout period.") | ||
return False | ||
# Wait for transaction finalization. | ||
if wait_for_finalization: | ||
try: | ||
while True: | ||
receipt = self.web3.eth.get_transaction_receipt(txn_hash) | ||
if receipt.blockNumber is not None: | ||
logger.info(f"Transaction finalized in block {receipt.blockNumber}. Hash: {txn_hash.hex()}") | ||
break | ||
time.sleep(2) | ||
except TransactionNotFound: | ||
logger.error("Transaction not found within timeout period.") | ||
return False | ||
|
||
return True | ||
return True | ||
except Exception as e: | ||
logger.error(f"Error during transfer: {e}") | ||
raise |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,6 +84,27 @@ def __init__( | |
if sync: | ||
self.sync(block=None, lite=lite) | ||
|
||
self.cache_file = f"state_cache_{network}_{dlp_uid}.json" | ||
|
||
def save_cache(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference between the cache and the state file? Seems to be saving the same data? |
||
cache_data = { | ||
"node_servers": [ns.__dict__ for ns in self.node_servers], | ||
"weights": self.weights, | ||
"last_update": self.last_update, | ||
"block": self.block | ||
} | ||
with open(self.cache_file, 'w') as f: | ||
json.dump(cache_data, f) | ||
|
||
def load_cache(self): | ||
if os.path.exists(self.cache_file): | ||
with open(self.cache_file, 'r') as f: | ||
cache_data = json.load(f) | ||
self.node_servers = set([vana.NodeServerInfo(**ns) for ns in cache_data['node_servers']]) | ||
self.weights = cache_data['weights'] | ||
self.last_update = cache_data['last_update'] | ||
self.block = cache_data['block'] | ||
|
||
def sync( | ||
self, | ||
block: Optional[int] = None, | ||
|
@@ -93,9 +114,14 @@ def sync( | |
""" | ||
Synchronizes the state with the network's current state. | ||
""" | ||
self.node_servers = chain_manager.get_active_node_servers() | ||
self.last_update = time.time() | ||
self.block = chain_manager.get_current_block() | ||
try: | ||
self.node_servers = chain_manager.get_active_node_servers() | ||
self.last_update = time.time() | ||
self.block = chain_manager.get_current_block() | ||
self.save_cache() | ||
except Exception as e: | ||
vana.logging.error(f"Failed to sync state: {e}") | ||
self.load_cache() | ||
|
||
def set_hotkeys(self, hotkeys: List[str]): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import time | ||
|
||
|
||
class CircuitBreaker: | ||
def __init__(self, failure_threshold, reset_timeout): | ||
self.failure_threshold = failure_threshold | ||
self.reset_timeout = reset_timeout | ||
self.failures = 0 | ||
self.last_failure_time = 0 | ||
self.state = "CLOSED" | ||
|
||
def call(self, func, *args, **kwargs): | ||
if self.state == "OPEN": | ||
if time.time() - self.last_failure_time > self.reset_timeout: | ||
self.state = "HALF-OPEN" | ||
else: | ||
raise Exception("Circuit is OPEN") | ||
|
||
try: | ||
result = func(*args, **kwargs) | ||
if self.state == "HALF-OPEN": | ||
self.state = "CLOSED" | ||
self.failures = 0 | ||
return result | ||
except Exception as e: | ||
self.failures += 1 | ||
self.last_failure_time = time.time() | ||
if self.failures >= self.failure_threshold: | ||
self.state = "OPEN" | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this, I feel this may get too noisy in the logs.