diff --git a/safe_transaction_service/history/services/index_service.py b/safe_transaction_service/history/services/index_service.py index 1d250efb7..cdff15c89 100644 --- a/safe_transaction_service/history/services/index_service.py +++ b/safe_transaction_service/history/services/index_service.py @@ -26,7 +26,7 @@ @dataclass -class IndexingStatus: +class AllIndexingStatus: current_block_number: int current_block_timestamp: int erc20_block_number: int @@ -39,10 +39,10 @@ class IndexingStatus: @dataclass -class ERC20IndexingStatus: +class SpecificIndexingStatus: current_block_number: int - erc20_block_number: int - erc20_synced: bool + block_number: int + synced: bool class IndexingException(Exception): @@ -126,16 +126,23 @@ def get_master_copies_current_indexing_block_number(self) -> Optional[int]: min_master_copies_block_number=Min("tx_block_number") )["min_master_copies_block_number"] - def get_indexing_status(self) -> IndexingStatus: - current_block = self.ethereum_client.get_block("latest") - current_block_number = current_block["number"] - - # Indexing points to the next block to be indexed, we need the previous ones + def get_erc20_indexing_status( + self, current_block_number: int + ) -> SpecificIndexingStatus: erc20_block_number = min( max(self.get_erc20_721_current_indexing_block_number() - 1, 0), current_block_number, ) + erc20_synced = ( + current_block_number - erc20_block_number <= self.eth_reorg_blocks + ) + return SpecificIndexingStatus( + current_block_number, erc20_block_number, erc20_synced + ) + def get_master_copies_indexing_status( + self, current_block_number: int + ) -> SpecificIndexingStatus: if ( master_copies_current_indexing_block_number := self.get_master_copies_current_indexing_block_number() ) is None: @@ -146,33 +153,50 @@ def get_indexing_status(self) -> IndexingStatus: current_block_number, ) - erc20_synced = ( - current_block_number - erc20_block_number <= self.eth_reorg_blocks - ) master_copies_synced = ( current_block_number - master_copies_block_number <= self.eth_reorg_blocks ) + return SpecificIndexingStatus( + current_block_number, master_copies_block_number, master_copies_synced + ) + + def get_indexing_status(self) -> AllIndexingStatus: + current_block = self.ethereum_client.get_block("latest") + current_block_number = current_block["number"] + + erc20_indexing_status = self.get_erc20_indexing_status(current_block_number) + master_copies_indexing_status = self.get_master_copies_indexing_status( + current_block_number + ) - if erc20_block_number == master_copies_block_number == current_block_number: + if ( + erc20_indexing_status.block_number + == master_copies_indexing_status.block_number + == current_block_number + ): erc20_block, master_copies_block = [current_block, current_block] else: erc20_block, master_copies_block = self.ethereum_client.get_blocks( - [erc20_block_number, master_copies_block_number] + [ + erc20_indexing_status.block_number, + master_copies_indexing_status.block_number, + ] ) current_block_timestamp = current_block["timestamp"] erc20_block_timestamp = erc20_block["timestamp"] master_copies_block_timestamp = master_copies_block["timestamp"] - return IndexingStatus( + return AllIndexingStatus( current_block_number=current_block_number, current_block_timestamp=current_block_timestamp, - erc20_block_number=erc20_block_number, + erc20_block_number=erc20_indexing_status.block_number, erc20_block_timestamp=erc20_block_timestamp, - erc20_synced=erc20_synced, - master_copies_block_number=master_copies_block_number, + erc20_synced=erc20_indexing_status.synced, + master_copies_block_number=master_copies_indexing_status.block_number, master_copies_block_timestamp=master_copies_block_timestamp, - master_copies_synced=master_copies_synced, - synced=erc20_synced and master_copies_synced, + master_copies_synced=master_copies_indexing_status.synced, + synced=erc20_indexing_status.synced + and master_copies_indexing_status.synced, ) def is_service_synced(self) -> bool: diff --git a/safe_transaction_service/history/tasks.py b/safe_transaction_service/history/tasks.py index c63147eb4..445d9734f 100644 --- a/safe_transaction_service/history/tasks.py +++ b/safe_transaction_service/history/tasks.py @@ -309,12 +309,21 @@ def process_decoded_internal_txs_for_safe_task( @app.shared_task(bind=True) @task_timeout(timeout_seconds=LOCK_TIMEOUT) -def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[int]: +def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> bool: """ Reindexes last hours for master copies to prevent indexing issues + + :param hours: Hours to reindex from now + :return: `True` if reindexing is triggered, `False` otherwise """ with contextlib.suppress(LockError): with only_one_running_task(self): + indexing_status = IndexServiceProvider().get_indexing_status() + if not indexing_status.master_copies_synced: + logger.warning( + "Reindexing master copies will not be executed as service is out of sync" + ) + return False if ethereum_block := EthereumBlock.objects.oldest_than( seconds=60 * 60 * hours ).first(): @@ -333,16 +342,27 @@ def reindex_mastercopies_last_hours_task(self, hours: float = 2.5) -> Optional[i reindex_master_copies_task.delay( from_block_number, to_block_number=to_block_number ) + return True + return False @app.shared_task(bind=True) @task_timeout(timeout_seconds=LOCK_TIMEOUT) -def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[int]: +def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> bool: """ Reindexes last hours for erx20 and erc721 to prevent indexing issues + + :param hours: Hours to reindex from now + :return: `True` if reindexing is triggered, `False` otherwise """ with contextlib.suppress(LockError): with only_one_running_task(self): + indexing_status = IndexServiceProvider().get_indexing_status() + if not indexing_status.erc20_synced: + logger.warning( + "Reindexing erc20/721 events will not be executed as service is out of sync" + ) + return False if ethereum_block := EthereumBlock.objects.oldest_than( seconds=60 * 60 * hours ).first(): @@ -361,6 +381,8 @@ def reindex_erc20_erc721_last_hours_task(self, hours: float = 2.5) -> Optional[i reindex_erc20_events_task.delay( from_block_number, to_block_number=to_block_number ) + return True + return False @app.shared_task(bind=True) diff --git a/safe_transaction_service/history/tests/test_tasks.py b/safe_transaction_service/history/tests/test_tasks.py index 769eb1f65..761186e99 100644 --- a/safe_transaction_service/history/tests/test_tasks.py +++ b/safe_transaction_service/history/tests/test_tasks.py @@ -9,7 +9,8 @@ from eth_account import Account -from ...events.services import QueueService +from safe_transaction_service.events.services import QueueService + from ...utils.redis import get_redis from ..indexers import ( Erc20EventsIndexerProvider, @@ -30,6 +31,7 @@ ReorgService, ) from ..services.collectibles_service import CollectibleWithMetadata +from ..services.index_service import SpecificIndexingStatus from ..tasks import ( check_reorgs_task, check_sync_status_task, @@ -128,16 +130,23 @@ def test_index_new_proxies_task(self): def test_index_safe_events_task(self): self.assertEqual(index_safe_events_task.delay().result, (0, 0)) + @patch.object(IndexService, "get_master_copies_indexing_status") @patch.object(IndexService, "reindex_master_copies") def test_reindex_mastercopies_last_hours_task( - self, reindex_master_copies_mock: MagicMock + self, + reindex_master_copies_mock: MagicMock, + get_master_copies_indexing_status_mock: MagicMock, ): + get_master_copies_indexing_status_mock.return_value = SpecificIndexingStatus( + 0, 0, True + ) + now = timezone.now() one_hour_ago = now - datetime.timedelta(hours=1) one_day_ago = now - datetime.timedelta(days=1) one_week_ago = now - datetime.timedelta(weeks=1) - reindex_mastercopies_last_hours_task() + self.assertFalse(reindex_mastercopies_last_hours_task()) reindex_master_copies_mock.assert_not_called() ethereum_block_0 = EthereumBlockFactory(timestamp=one_week_ago) @@ -145,37 +154,52 @@ def test_reindex_mastercopies_last_hours_task( ethereum_block_2 = EthereumBlockFactory(timestamp=one_hour_ago) ethereum_block_3 = EthereumBlockFactory(timestamp=now) - reindex_mastercopies_last_hours_task() + self.assertTrue(reindex_mastercopies_last_hours_task()) reindex_master_copies_mock.assert_called_once_with( ethereum_block_1.number, to_block_number=ethereum_block_3.number, addresses=None, ) + get_master_copies_indexing_status_mock.return_value = SpecificIndexingStatus( + 0, 0, False + ) + self.assertFalse(reindex_mastercopies_last_hours_task()) + + @patch.object(IndexService, "get_erc20_indexing_status") @patch.object(IndexService, "reindex_erc20_events") def test_reindex_erc20_erc721_last_hours_task( - self, reindex_erc20_events: MagicMock + self, + reindex_erc20_events_mock: MagicMock, + get_erc20_indexing_status_mock: MagicMock, ): + get_erc20_indexing_status_mock.return_value = SpecificIndexingStatus(0, 0, True) + now = timezone.now() one_hour_ago = now - datetime.timedelta(hours=1) one_day_ago = now - datetime.timedelta(days=1) one_week_ago = now - datetime.timedelta(weeks=1) - reindex_erc20_erc721_last_hours_task() - reindex_erc20_events.assert_not_called() + self.assertFalse(reindex_erc20_erc721_last_hours_task()) + reindex_erc20_events_mock.assert_not_called() ethereum_block_0 = EthereumBlockFactory(timestamp=one_week_ago) ethereum_block_1 = EthereumBlockFactory(timestamp=one_day_ago) ethereum_block_2 = EthereumBlockFactory(timestamp=one_hour_ago) ethereum_block_3 = EthereumBlockFactory(timestamp=now) - reindex_erc20_erc721_last_hours_task() - reindex_erc20_events.assert_called_once_with( + self.assertTrue(reindex_erc20_erc721_last_hours_task()) + reindex_erc20_events_mock.assert_called_once_with( ethereum_block_1.number, to_block_number=ethereum_block_3.number, addresses=None, ) + get_erc20_indexing_status_mock.return_value = SpecificIndexingStatus( + 0, 0, False + ) + self.assertFalse(reindex_erc20_erc721_last_hours_task()) + def test_process_decoded_internal_txs_task(self): owner = Account.create().address safe_address = Account.create().address