From 42ce7b17b7d651857e6046fd2aab81ea987f0229 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Mon, 9 Dec 2024 09:58:05 -0800 Subject: [PATCH] post processing 2 fixes --- chia/full_node/full_node.py | 273 ++++++++++++++++++------------------ 1 file changed, 139 insertions(+), 134 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 75434bce16e9..f8f5c2126872 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -4,6 +4,7 @@ import contextlib import copy import dataclasses +import inspect import logging import multiprocessing import random @@ -364,17 +365,17 @@ async def manage(self) -> AsyncIterator[None]: async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} mempool new_peak") pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), None) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} mempool new_peak") - assert len(pending_tx.items) == 0 # no pending transactions when starting up + assert len(pending_tx.items) == 0 # no pending transactions when starting up - full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() - assert full_peak is not None - state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], []) - ppp_result: PeakPostProcessingResult = await self.peak_post_processing( - full_peak, state_change_summary, None - ) - await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result) + full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() + assert full_peak is not None + state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], []) + ppp_result: PeakPostProcessingResult = await self.peak_post_processing( + full_peak, state_change_summary, None + ) + await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} mempool new_peak") if self.config["send_uncompact_interval"] != 0: sanitize_weight_proof_only = False if "sanitize_weight_proof_only" in self.config: @@ -674,6 +675,12 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t if not response: raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}") self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short batch syncing") + + peak_fb = None + peer = None + state_change_summary = None + ppp_result = None + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short batch syncing") state_change_summary: Optional[StateChangeSummary] @@ -716,7 +723,6 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t state_change_summary, peer, ) - await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result) except Exception: # Still do post processing after cancel (or exception) peak_fb = await self.blockchain.get_full_peak() @@ -725,9 +731,12 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t raise finally: self.log.info(f"Added blocks {height}-{end_height}") + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short batch syncing") + if peak_fb is not None: + await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result) finally: self.sync_store.batch_syncing.remove(peer.peer_node_id) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short batch syncing") + return True async def short_sync_backtrack( @@ -787,28 +796,9 @@ async def short_sync_backtrack( ) for block in reversed(blocks): - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): - self.log.info( - f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" - ) - if self.blockchain.contains_block(block.header_hash): - self.log.info(f"short_sync_backtrack main has {block.header_hash.hex()}") - else: - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - # when syncing, we won't share any signatures with the - # mempool, so there's no need to pass in the BLS cache. - await self.add_block(block, peer, fork_info=fork_info) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + # when syncing, we won't share any signatures with the + # mempool, so there's no need to pass in the BLS cache. + await self.add_block(block, peer, fork_info=fork_info) except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) @@ -1862,6 +1852,10 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: if self._server is None: return None + peak_fb = None + state_change_summary = None + ppp_result = None + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} _finish_sync") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} _finish_sync") @@ -1875,9 +1869,11 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: ppp_result: PeakPostProcessingResult = await self.peak_post_processing( peak_fb, state_change_summary, None ) - await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} _finish_sync") + if peak_fb is not None: + await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) + if peak is not None and self.weight_proof_handler is not None: await self.weight_proof_handler.get_proof_of_weight(peak.header_hash) self._state_changed("block") @@ -2229,100 +2225,118 @@ async def add_block( return await self.add_block(new_block, peer, bls_cache) state_change_summary: Optional[StateChangeSummary] = None ppp_result: Optional[PeakPostProcessingResult] = None + + validation_time = 0 + async with enable_profiler(self.profile_block_validation) as pr: - # After acquiring the lock, check again, because another asyncio thread might have added it - if self.blockchain.contains_block(header_hash): - return None - validation_start = time.monotonic() - # Tries to add the block to the blockchain, if we already validated transactions, don't do it again - conds = None - if pre_validation_result is not None and pre_validation_result.conds is not None: - conds = pre_validation_result.conds + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} add_block") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} add_block") + async with self.block_store.db_wrapper.writer() as conn: + self.log.info(f"BEGIN WJB task {asyncio.current_task().get_name()} add_block writer {conn}") + frame = inspect.currentframe().f_back + self.log.info(f"WJB A {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}") - # Don't validate signatures because we want to validate them in the main thread later, since we have a - # cache available - prev_b = None - prev_ses_block = None - if block.height > 0: - prev_b = await self.blockchain.get_block_record_from_db(block.prev_header_hash) - assert prev_b is not None - curr = prev_b - while curr.height > 0 and curr.sub_epoch_summary_included is None: - curr = self.blockchain.block_record(curr.prev_hash) - prev_ses_block = curr - new_slot = len(block.finished_sub_slots) > 0 - ssi, diff = get_next_sub_slot_iters_and_difficulty(self.constants, new_slot, prev_b, self.blockchain) - future = await pre_validate_block( - self.blockchain.constants, - AugmentedBlockchain(self.blockchain), - block, - self.blockchain.pool, - conds, - ValidationState(ssi, diff, prev_ses_block), - ) - pre_validation_result = await future - added: Optional[AddBlockResult] = None - pre_validation_time = time.monotonic() - validation_start - try: - if pre_validation_result.error is not None: - if Err(pre_validation_result.error) == Err.INVALID_PREV_BLOCK_HASH: - added = AddBlockResult.DISCONNECTED_BLOCK - error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH - elif Err(pre_validation_result.error) == Err.TIMESTAMP_TOO_FAR_IN_FUTURE: - raise TimestampError() - else: - raise ValueError( - f"Failed to validate block {header_hash} height " - f"{block.height}: {Err(pre_validation_result.error).name}" - ) - else: - if fork_info is None: - fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) - (added, error_code, state_change_summary) = await self.blockchain.add_block( - block, pre_validation_result, ssi, fork_info + # After acquiring the lock, check again, because another asyncio thread might have added it + if self.blockchain.contains_block(header_hash): + return None + validation_start = time.monotonic() + # Tries to add the block to the blockchain, if we already validated transactions, don't do it again + conds = None + if pre_validation_result is not None and pre_validation_result.conds is not None: + conds = pre_validation_result.conds + + # Don't validate signatures because we want to validate them in the main thread later, since we have a + # cache available + prev_b = None + prev_ses_block = None + if block.height > 0: + prev_b = await self.blockchain.get_block_record_from_db(block.prev_header_hash) + assert prev_b is not None + curr = prev_b + while curr.height > 0 and curr.sub_epoch_summary_included is None: + curr = self.blockchain.block_record(curr.prev_hash) + prev_ses_block = curr + new_slot = len(block.finished_sub_slots) > 0 + ssi, diff = get_next_sub_slot_iters_and_difficulty( + self.constants, new_slot, prev_b, self.blockchain ) - if added == AddBlockResult.ALREADY_HAVE_BLOCK: - return None - elif added == AddBlockResult.INVALID_BLOCK: - assert error_code is not None - self.log.error(f"Block {header_hash} at height {block.height} is invalid with code {error_code}.") - raise ConsensusError(error_code, [header_hash]) - elif added == AddBlockResult.DISCONNECTED_BLOCK: - self.log.info(f"Disconnected block {header_hash} at height {block.height}") - if raise_on_disconnected: - raise RuntimeError("Expected block to be added, received disconnected block.") - return None - elif added == AddBlockResult.NEW_PEAK: - # Evict any related BLS cache entries as we no longer need them - if bls_cache is not None and pre_validation_result.conds is not None: - pairs_pks, pairs_msgs = pkm_pairs( - pre_validation_result.conds, - self.constants.AGG_SIG_ME_ADDITIONAL_DATA, - ) - bls_cache.evict(pairs_pks, pairs_msgs) - # Only propagate blocks which extend the blockchain (becomes one of the heads) - assert state_change_summary is not None - post_process_time = time.monotonic() - ppp_result = await self.peak_post_processing(block, state_change_summary, peer) - post_process_time = time.monotonic() - post_process_time - - elif added == AddBlockResult.ADDED_AS_ORPHAN: - self.log.info( - f"Received orphan block of height {block.height} rh {block.reward_chain_block.get_hash()}" + future = await pre_validate_block( + self.blockchain.constants, + AugmentedBlockchain(self.blockchain), + block, + self.blockchain.pool, + conds, + ValidationState(ssi, diff, prev_ses_block), ) - post_process_time = 0 - else: - # Should never reach here, all the cases are covered - raise RuntimeError(f"Invalid result from add_block {added}") - except asyncio.CancelledError: - # We need to make sure to always call this method even when we get a cancel exception, to make sure - # the node stays in sync - if added == AddBlockResult.NEW_PEAK: - assert state_change_summary is not None - await self.peak_post_processing(block, state_change_summary, peer) - raise - - validation_time = time.monotonic() - validation_start + pre_validation_result = await future + added: Optional[AddBlockResult] = None + pre_validation_time = time.monotonic() - validation_start + try: + if pre_validation_result.error is not None: + if Err(pre_validation_result.error) == Err.INVALID_PREV_BLOCK_HASH: + added = AddBlockResult.DISCONNECTED_BLOCK + error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH + elif Err(pre_validation_result.error) == Err.TIMESTAMP_TOO_FAR_IN_FUTURE: + raise TimestampError() + else: + raise ValueError( + f"Failed to validate block {header_hash} height " + f"{block.height}: {Err(pre_validation_result.error).name}" + ) + else: + if fork_info is None: + fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) + (added, error_code, state_change_summary) = await self.blockchain.add_block( + block, pre_validation_result, ssi, fork_info + ) + if added == AddBlockResult.ALREADY_HAVE_BLOCK: + return None + elif added == AddBlockResult.INVALID_BLOCK: + assert error_code is not None + self.log.error( + f"Block {header_hash} at height {block.height} is invalid with code {error_code}." + ) + raise ConsensusError(error_code, [header_hash]) + elif added == AddBlockResult.DISCONNECTED_BLOCK: + self.log.info(f"Disconnected block {header_hash} at height {block.height}") + if raise_on_disconnected: + raise RuntimeError("Expected block to be added, received disconnected block.") + return None + elif added == AddBlockResult.NEW_PEAK: + # Evict any related BLS cache entries as we no longer need them + if bls_cache is not None and pre_validation_result.conds is not None: + pairs_pks, pairs_msgs = pkm_pairs( + pre_validation_result.conds, + self.constants.AGG_SIG_ME_ADDITIONAL_DATA, + ) + bls_cache.evict(pairs_pks, pairs_msgs) + # Only propagate blocks which extend the blockchain (becomes one of the heads) + assert state_change_summary is not None + post_process_time = time.monotonic() + ppp_result = await self.peak_post_processing(block, state_change_summary, peer) + post_process_time = time.monotonic() - post_process_time + + elif added == AddBlockResult.ADDED_AS_ORPHAN: + self.log.info( + f"Received orphan block of height {block.height} rh {block.reward_chain_block.get_hash()}" + ) + post_process_time = 0 + else: + # Should never reach here, all the cases are covered + raise RuntimeError(f"Invalid result from add_block {added}") + except asyncio.CancelledError: + # We need to make sure to always call this method even when we get a cancel exception, to make sure + # the node stays in sync + if added == AddBlockResult.NEW_PEAK: + assert state_change_summary is not None + await self.peak_post_processing(block, state_change_summary, peer) + raise + finally: + self.log.info(f"END WJB task {asyncio.current_task().get_name()} add_block writer {conn}") + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} add_block") + + validation_time = time.monotonic() - validation_start if ppp_result is not None: assert state_change_summary is not None @@ -2766,16 +2780,7 @@ async def new_infusion_point_vdf( self.log.warning("Trying to make a pre-farm block but height is not 0") return None try: - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} timelord add_block") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): - self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} timelord add_block") - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} timelord add_block writer {conn}" - ) - await self.add_block(block, None, self._bls_cache, raise_on_disconnected=True) - self.log.info(f"END WJB task {asyncio.current_task().get_name()} timelord add_block writer {conn}") - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} timelord add_block") + await self.add_block(block, None, self._bls_cache, raise_on_disconnected=True) except Exception as e: self.log.warning(f"Consensus error validating block: {e}") if timelord_peer is not None: