Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch handler option in index #922

Closed
pyk opened this issue Jan 20, 2024 · 2 comments
Closed

Add batch handler option in index #922

pyk opened this issue Jan 20, 2024 · 2 comments
Assignees
Labels
community Reported or suggested by our awesome users feature New feature or request
Milestone

Comments

@pyk
Copy link

pyk commented Jan 20, 2024

What feature would you like to see in DipDup?

I want to create handler that handle batch of events instead of single event.

Current setup:

  raw_events_uniswap_v2_pair_sync:
    kind: evm.subsquid.events
    datasource: subsquid_node
    first_level: 10000835
    handlers:
      - callback: on_pair_sync
        contract: uniswap_v2_pair
        name: Sync

I want my handler have option like batch: true

  raw_events_uniswap_v2_pair_sync:
    kind: evm.subsquid.events
    datasource: subsquid_node
    first_level: 10000835
    handlers:
      - callback: on_pair_sync
        contract: uniswap_v2_pair
        batch: true
        name: Sync

on_pair_sync will process batch of events instead of single events.

Why do you need this feature, what's the use case?

Most of my index handlers are only inserting raw events to the postgresql table.

All processing logic are implemented in hooks.

Batch handler will speed up the indexing process.

Is there a workaround currently?

I'm experimenting with batch sender that looks like this:

import asyncio
import logging
import signal

from dipdup.context import HandlerContext
from dipdup.models.evm_subsquid import SubsquidEvent
from indexer.models import EventsUniswapV2PairSync
from indexer.types.uniswap_v2_pair.evm_events.sync import Sync

logger = logging.getLogger("dipdup")

BATCH: list[EventsUniswapV2PairSync] = []


async def batch_insert(event: EventsUniswapV2PairSync):
    BATCH.append(event)
    batch_count = len(BATCH)

    if batch_count >= 100:
        await EventsUniswapV2PairSync.bulk_create(BATCH, ignore_conflicts=True)
        BATCH.clear()
        logger.info("%s Sync events inserted to the database", batch_count)
    return


async def send_batch_on_shutdown():
    try:
        while True:
            await asyncio.sleep(60 * 60)
    finally:
        batch_count = len(BATCH)
        logger.info(
            "dipdup shutting down, inserting %s Sync events to the database...",
            batch_count,
        )
        if batch_count > 0:
            await EventsUniswapV2PairSync.bulk_create(
                BATCH, ignore_conflicts=True
            )
            logger.info(
                "dipdup shutting down, inserting %s Sync events to the database... DONE",
                batch_count,
            )
            BATCH.clear()


asyncio.create_task(send_batch_on_shutdown())  # noqa: RUF006

async def on_pair_sync(
    ctx: HandlerContext,
    event: SubsquidEvent[Sync],
) -> None:
    block_number = event.data.level
    block_timestamp = event.data.timestamp
    transaction_index = event.data.transaction_index
    log_index = event.data.log_index

    sync_id = f"{block_number}-{transaction_index}-{log_index}"
    pair_id = event.data.address.lower()
    reserve0 = str(event.payload.reserve0)
    reserve1 = str(event.payload.reserve1)

    event = EventsUniswapV2PairSync(
        id=sync_id,
        block_number=block_number,
        block_timestamp=block_timestamp,
        transaction_index=transaction_index,
        log_index=log_index,
        pair_id=pair_id,
        reserve0=reserve0,
        reserve1=reserve1,
    )
    await batch_insert(event=event)
@pyk pyk changed the title Add batch_handler option in index Add batch handler option in index Jan 20, 2024
@droserasprout droserasprout added feature New feature or request community Reported or suggested by our awesome users labels Jan 22, 2024
@droserasprout
Copy link
Member

Hi @pyk!

I like your workaround, it's smart! We had already thought about the batch processing previously, but haven't done any tests or benchmarks. So, next are just my speculations.

This task likely could be solved more efficiently on the framework level, since DipDup opens new transactions for every level there are handlers matched regardless of the number of SQL ops inside. It could be that empty transactions are somehow optimized either by Tortoise or by PostgreSQL, but they are definitely not free.

If you'd like to help with this task and dive into DipDup's guts, an entry point for experiments is dipdup.transactions.TransactionsManager.in_transaction method. It should be patched to reuse the same transaction until the batch is full (or even by time intervals). Something similar to your handler-level solution.

@droserasprout droserasprout self-assigned this Jul 12, 2024
@droserasprout droserasprout added this to the 8.0.0b5 milestone Aug 9, 2024
@droserasprout
Copy link
Member

Implemented with fixed size, followup #1091

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community Reported or suggested by our awesome users feature New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants