Skip to content

Commit

Permalink
Denormalize InternalTxDecoded
Browse files Browse the repository at this point in the history
- Add `safe` field so it can be indexed propertly
- This PR will speed up transaction processing speed
  • Loading branch information
Uxio0 committed Nov 4, 2024
1 parent c35bba6 commit 84b73b4
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 67 deletions.
8 changes: 2 additions & 6 deletions safe_transaction_service/history/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,7 @@ def queryset(self, request, queryset):
if self.value() == "YES":
return queryset.filter(
Q(
Exists(
SafeContract.objects.filter(
address=OuterRef("internal_tx___from")
)
)
Exists(SafeContract.objects.filter(address=OuterRef("safe")))
) # Just Safes indexed
| Q(function_name="setup") # Safes pending to be indexed
)
Expand Down Expand Up @@ -260,7 +256,7 @@ class InternalTxDecodedAdmin(AdvancedAdminSearchMixin, admin.ModelAdmin):
search_fields = [
"==function_name",
"==internal_tx__to",
"==internal_tx___from",
"==safe",
"==internal_tx__ethereum_tx__tx_hash",
"==internal_tx__block_number",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def _get_internal_txs_to_decode(
function_name=function_name,
arguments=arguments,
processed=False,
safe=internal_tx._from,
)
except CannotDecode as exc:
logger.debug("Cannot decode %s: %s", data.hex(), exc)
Expand Down
19 changes: 12 additions & 7 deletions safe_transaction_service/history/indexers/safe_events_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,14 @@ def _is_setup_indexed(self, safe_address: ChecksumAddress):
:param safe_address:
:return: ``True`` if ``SafeSetup`` event was processed, ``False`` otherwise
"""
return InternalTxDecoded.objects.filter(
function_name="setup",
internal_tx___from=safe_address,
internal_tx__contract_address=None,
).exists()
return (
InternalTxDecoded.objects.for_safe(safe_address)
.filter(
function_name="setup",
internal_tx__contract_address=None,
)
.exists()
)

@transaction.atomic
def decode_elements(self, *args) -> List[EventData]:
Expand Down Expand Up @@ -368,6 +371,7 @@ def _process_decoded_element(
internal_tx=internal_tx,
function_name="",
arguments=args,
safe=safe_address,
)

if event_name == "ProxyCreation" or event_name == "SafeSetup":
Expand Down Expand Up @@ -518,10 +522,10 @@ def _process_safe_creation_events(
# Check if were indexed
safe_creation_events_addresses = set(safe_addresses_with_creation_events.keys())
indexed_addresses = InternalTxDecoded.objects.filter(
internal_tx___from__in=safe_creation_events_addresses,
safe__in=safe_creation_events_addresses,
function_name="setup",
internal_tx__contract_address=None,
).values_list("internal_tx___from", flat=True)
).values_list("safe", flat=True)
# Ignoring the already indexed contracts
addresses_to_index = safe_creation_events_addresses - set(indexed_addresses)

Expand Down Expand Up @@ -582,6 +586,7 @@ def _process_safe_creation_events(
internal_tx=internal_tx,
function_name="setup",
arguments=setup_args,
safe=safe_address,
)
internal_txs.append(internal_tx)
internal_decoded_txs.append(internal_tx_decoded)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def handle(self, *args, **options):
internal_tx=internal_tx,
function_name=function_name,
arguments=arguments,
safe=internal_tx._from,
)
found += 1
self.stdout.write(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Generated by Django 5.0.9 on 2024-11-04 13:16

from django.db import migrations, models

import safe_eth.eth.django.models
from safe_eth.eth.constants import NULL_ADDRESS


class Migration(migrations.Migration):

dependencies = [
("history", "0090_multisigtransaction_proposed_by_delegate"),
]

operations = [
migrations.RemoveIndex(
model_name="internaltxdecoded",
name="history_decoded_processed_idx",
),
migrations.AddField(
model_name="internaltxdecoded",
name="safe",
field=safe_eth.eth.django.models.EthereumAddressBinaryField(
db_index=True, default=NULL_ADDRESS
),
preserve_default=False,
),
migrations.RunSQL(
"""
UPDATE "history_internaltxdecoded" decoded SET safe = internal._from
FROM "history_internaltx" internal
WHERE internal.id = decoded.internal_tx_id
"""
),
migrations.AddIndex(
model_name="internaltxdecoded",
index=models.Index(
condition=models.Q(("processed", False)),
fields=["safe"],
name="history_decoded_processed_idx",
),
),
]
13 changes: 5 additions & 8 deletions safe_transaction_service/history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ def for_safe(self, safe_address: ChecksumAddress):
:param safe_address:
:return: Queryset of all InternalTxDecoded for one Safe with `safe_address`
"""
return self.filter(internal_tx___from=safe_address)
return self.filter(safe=safe_address)

def processed(self):
return self.filter(processed=True)
Expand Down Expand Up @@ -1204,19 +1204,15 @@ def pending_for_safe(self, safe_address: ChecksumAddress):
"""
return (
self.pending_for_safes()
.filter(internal_tx___from=safe_address)
.for_safe(safe_address)
.select_related("internal_tx", "internal_tx__ethereum_tx")
)

def safes_pending_to_be_processed(self) -> QuerySet[ChecksumAddress]:
"""
:return: List of Safe addresses that have transactions pending to be processed
"""
return (
self.not_processed()
.values_list("internal_tx___from", flat=True)
.distinct("internal_tx___from")
)
return self.not_processed().values_list("safe", flat=True).distinct("safe")


class InternalTxDecoded(models.Model):
Expand All @@ -1230,12 +1226,13 @@ class InternalTxDecoded(models.Model):
function_name = models.CharField(max_length=256, db_index=True)
arguments = JSONField()
processed = models.BooleanField(default=False)
safe = EthereumAddressBinaryField(db_index=True)

class Meta:
indexes = [
models.Index(
name="history_decoded_processed_idx",
fields=["processed"],
fields=["safe"],
condition=Q(processed=False),
)
]
Expand Down
6 changes: 3 additions & 3 deletions safe_transaction_service/history/services/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def _reprocess(self, addresses: List[str]):
logger.info("Mark all internal txs decoded as not processed")
queryset = InternalTxDecoded.objects.all()
if addresses:
queryset = queryset.filter(internal_tx___from__in=addresses)
queryset = queryset.filter(safe__in=addresses)
queryset.update(processed=False)

@transaction.atomic
Expand Down Expand Up @@ -411,8 +411,8 @@ def fix_out_of_order(
"[%s] Marking InternalTxDecoded newer than timestamp as not processed",
address,
)
InternalTxDecoded.objects.filter(
internal_tx___from=address, internal_tx__timestamp__gte=timestamp
InternalTxDecoded.objects.for_safe(address).filter(
internal_tx__timestamp__gte=timestamp
).update(processed=False)
logger.info("[%s] Removing SafeStatus newer than timestamp", address)
SafeStatus.objects.filter(
Expand Down
5 changes: 4 additions & 1 deletion safe_transaction_service/history/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,12 @@ class Params:
"operation": 0,
}

internal_tx = factory.SubFactory(InternalTxFactory)
internal_tx = factory.SubFactory(
InternalTxFactory, _from=factory.SelfAttribute("..safe")
)
function_name = factory.fuzzy.FuzzyText(prefix="safe-", suffix="fn")
processed = False
safe = factory.LazyFunction(lambda: Account.create().address)

@factory.lazy_attribute
def arguments(self) -> Dict[str, Any]:
Expand Down
4 changes: 2 additions & 2 deletions safe_transaction_service/history/tests/test_index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def test_process_decoded_txs(self):

setup_internal_tx = InternalTxDecodedFactory(
function_name="setup",
internal_tx___from=safe_address,
safe=safe_address,
)
self.assertEqual(self.index_service.process_decoded_txs(safe_address), 1)
fix_out_of_order_mock.assert_not_called()
Expand All @@ -152,7 +152,7 @@ def test_process_decoded_txs(self):
exec_transactions = [
InternalTxDecodedFactory(
function_name="execTransaction",
internal_tx___from=safe_address,
safe=safe_address,
)
for _ in range(3)
]
Expand Down
49 changes: 49 additions & 0 deletions safe_transaction_service/history/tests/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from django_test_migrations.migrator import Migrator
from eth_account import Account
from safe_eth.eth.constants import NULL_ADDRESS
from safe_eth.eth.utils import fast_keccak, fast_keccak_text


Expand Down Expand Up @@ -363,3 +364,51 @@ def test_migration_0082_safecontract_created(self):
self.assertEqual(
safe_contract.created, safe_contract.ethereum_tx.block.timestamp
)

def test_migration_0091_add_safe_internal_tx_decoded(self):
# Add `safe` field to InternalTxDecoded
old_state = self.migrator.apply_initial_migration(
("history", "0090_multisigtransaction_proposed_by_delegate"),
)

EthereumBlock = old_state.apps.get_model("history", "EthereumBlock")
EthereumTx = old_state.apps.get_model("history", "EthereumTx")
InternalTx = old_state.apps.get_model("history", "InternalTx")
InternalTxDecoded = old_state.apps.get_model("history", "InternalTxDecoded")
ethereum_tx = self.build_ethereum_tx(EthereumBlock, EthereumTx)
random_safe_address = Account.create().address
internal_tx = InternalTx.objects.create(
ethereum_tx=ethereum_tx,
timestamp=timezone.now(),
block_number=ethereum_tx.block.number,
_from=random_safe_address,
gas=5,
data=b"",
to=NULL_ADDRESS,
value=0,
gas_used=10,
contract_address=None,
code=None,
output=None,
refund_address=None,
tx_type=0,
call_type=0,
trace_address="0",
error=None,
)
InternalTxDecoded.objects.create(
internal_tx=internal_tx,
function_name="anything",
arguments={},
processed=True,
)

new_state = self.migrator.apply_tested_migration(
("history", "0091_add_safe_internal_tx_decoded"),
)

InternalTxDecodedNew = new_state.apps.get_model("history", "InternalTxDecoded")
internal_tx_decoded_new = InternalTxDecodedNew.objects.get()
self.assertEqual(
internal_tx_decoded_new.safe, internal_tx._from, random_safe_address
)
27 changes: 13 additions & 14 deletions safe_transaction_service/history/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,10 @@ def test_internal_txs_can_be_decoded(self):
self.assertEqual(InternalTx.objects.can_be_decoded().count(), 1)

InternalTxDecoded.objects.create(
function_name="alo", arguments={}, internal_tx=internal_tx
function_name="alo",
arguments={},
internal_tx=internal_tx,
safe=internal_tx._from,
)
self.assertEqual(InternalTx.objects.can_be_decoded().count(), 0)

Expand Down Expand Up @@ -743,18 +746,14 @@ def test_safes_pending_to_be_processed(self):
)

safe_address_1 = SafeContractFactory().address
internal_tx_decoded_1 = InternalTxDecodedFactory(
internal_tx___from=safe_address_1
)
InternalTxDecodedFactory(internal_tx___from=safe_address_1)
internal_tx_decoded_1 = InternalTxDecodedFactory(safe=safe_address_1)
InternalTxDecodedFactory(safe=safe_address_1)
results = InternalTxDecoded.objects.safes_pending_to_be_processed()
self.assertIsInstance(results, QuerySet)
self.assertCountEqual(results, [safe_address_1])

safe_address_2 = SafeContractFactory().address
internal_tx_decoded_2 = InternalTxDecodedFactory(
internal_tx___from=safe_address_2
)
internal_tx_decoded_2 = InternalTxDecodedFactory(safe=safe_address_2)
self.assertCountEqual(
InternalTxDecoded.objects.safes_pending_to_be_processed(),
[safe_address_1, safe_address_2],
Expand All @@ -772,7 +771,7 @@ def test_out_of_order_for_safe(self):
self.assertFalse(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

i = InternalTxDecodedFactory(
internal_tx___from=random_safe,
safe=random_safe,
internal_tx__block_number=10,
processed=False,
)
Expand All @@ -782,14 +781,14 @@ def test_out_of_order_for_safe(self):
self.assertFalse(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

InternalTxDecodedFactory(
internal_tx___from=random_safe,
safe=random_safe,
internal_tx__block_number=11,
processed=False,
)
self.assertFalse(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

InternalTxDecodedFactory(
internal_tx___from=random_safe, internal_tx__block_number=9, processed=False
safe=random_safe, internal_tx__block_number=9, processed=False
)
self.assertTrue(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))
i.processed = False
Expand All @@ -798,17 +797,17 @@ def test_out_of_order_for_safe(self):
self.assertFalse(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

InternalTxDecodedFactory(
internal_tx___from=random_safe, internal_tx__block_number=8, processed=True
safe=random_safe, internal_tx__block_number=8, processed=True
)
self.assertFalse(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

InternalTxDecodedFactory(
internal_tx___from=random_safe, internal_tx__block_number=9, processed=True
safe=random_safe, internal_tx__block_number=9, processed=True
)
self.assertFalse(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

InternalTxDecodedFactory(
internal_tx___from=random_safe, internal_tx__block_number=10, processed=True
safe=random_safe, internal_tx__block_number=10, processed=True
)
self.assertTrue(InternalTxDecoded.objects.out_of_order_for_safe(random_safe))

Expand Down
4 changes: 2 additions & 2 deletions safe_transaction_service/history/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def test_process_decoded_internal_txs_task(self):
threshold=threshold,
fallback_handler=fallback_handler,
internal_tx__to=master_copy,
internal_tx___from=safe_address,
safe=safe_address,
)
process_decoded_internal_txs_task.delay()
self.assertTrue(SafeContract.objects.get(address=safe_address))
Expand All @@ -211,7 +211,7 @@ def test_process_decoded_internal_txs_for_banned_safe(self):
threshold=threshold,
fallback_handler=fallback_handler,
internal_tx__to=master_copy,
internal_tx___from=safe_address,
safe=safe_address,
)
SafeContractFactory(address=safe_address, banned=True)
self.assertTrue(SafeContract.objects.get(address=safe_address).banned)
Expand Down
Loading

0 comments on commit 84b73b4

Please sign in to comment.