Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leaks AsyncClient #424

Open
ZwickVitaly opened this issue Nov 17, 2024 · 1 comment
Open

Memory leaks AsyncClient #424

ZwickVitaly opened this issue Nov 17, 2024 · 1 comment
Labels
bug Something isn't working need info Please provide additional information

Comments

@ZwickVitaly
Copy link

ZwickVitaly commented Nov 17, 2024

Describe the bug

I need to write a massive (2b rows per 24 hours) amount of data every day.
Application does 1m requests per city. (4 cities)
Evey request gets 500 rows of info.
Every 4 requests I write rows to clickhouse db using clickhouse-connect -> 2k products per write
Got 8 celery processes, each does 125k requests part of all
Each process does 4 requests async.gather -> gets 4 lists of 500 rows, combines them to 2k list
There goes client.insert

Every 10 minutes SWAP memory increases by 50mb, BUT Database swap usage stays on same level always (70MB)
smem shows, that celery processes grow in SWAP memory.

I was worried that aiohttp leaks, but removing insert and doing just requests does not make memory grow at all

so the problem is insert

Steps to reproduce

  1. Run process, that writes something with AsyncClient.insert in a loop
  2. Wait
  3. See swap memory growth

Expected behaviour

No swap memory growth

Seems to me, that client does not clear QueryResult or QueryResultSummary properly, but I am not smart enough to figure it out.

Code example

made custom async context manager for getting connect

@asynccontextmanager
async def get_async_connection() -> AsyncClient:
    session = AsyncSession(CLICKHOUSE_CONFING)
    async with session as client:
        yield client


class AsyncSession:

    def __init__(self, clickhouse_config):
        self.config = clickhouse_config

    async def __aenter__(self) -> AsyncClient:
        self.client = await clickhouse_connect.get_async_client(**self.config)
        return self.client

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.client.close()

this is request function

async def get_city_result(city, date, requests, request_batch_no, client=None):
    requests_list = [r for r in requests if not r[1].isdigit() or "javascript" not in r[1]]
    batch_size = 4
    prev = 0
    async with ClientSession() as http_session:
        async with get_async_connection() as client:
            for batch_i in range(batch_size, len(requests_list) + 1, batch_size):
                request_batch = requests_list[prev:batch_i]
                requests_tasks = [
                    asyncio.create_task(
                        get_r_data(
                            r=r,
                            city=city,
                            date=date,
                            http_session=http_session,
                        )
                    )
                    for r in request_batch
                ]
                product_batches: tuple = await asyncio.gather(*requests_tasks)
                prev = batch_i
                full_res = []
                for batch in product_batches:
                    full_res.extend(batch)
                await save_to_db(
                    items=full_res,
                    table="request_product",
                    fields=["product", "city", "date", "query", "place", "advert", "natural_place", "cpm"],
                    client=client
                )

and this is saving to db function:

async def save_to_db(items, table, fields, client: AsyncClient):
    if items:
        try:
            await client.insert(table, items, column_names=fields)
        except Exception as e:
            logger.critical(f"{e}, {items}")

yes, I tried to do

async def save_to_db(items, table, fields):
    if items:
        try:
            async with get_async_connection() as client:
                await client.insert(table, items, column_names=fields)
        except Exception as e:
            logger.critical(f"{e}, {items}")

made it worse

Configuration

Environment

  • clickhouse-connect version: 0.8.6
  • Python version: 3.12
  • Operating system: docker (python:3.12-alpine)

ClickHouse server

  • ClickHouse Server version: 22.1
  • CREATE TABLE statements for tables involved:
client.command('''CREATE TABLE IF NOT EXISTS request_product (
                        product UInt32 CODEC(LZ4HC),
                        city UInt8 CODEC(LZ4HC),
                        date UInt16 CODEC(LZ4HC),
                        query UInt32 CODEC(LZ4HC),
                        place UInt16 Codec(LZ4HC),
                        advert FixedString(1) Codec(LZ4HC),
                        natural_place UInt16 Codec (LZ4HC),
                        cpm UInt16 DEFAULT 0 CODEC(LZ4HC)
                    ) ENGINE = MergeTree()
                    PRIMARY KEY (product, city, date) 
                    ORDER BY (product, city, date, query);''')
@ZwickVitaly ZwickVitaly added the bug Something isn't working label Nov 17, 2024
@genzgd
Copy link
Collaborator

genzgd commented Nov 17, 2024

I've not seen evidence of memory leaks in some of our long running tests, so It's difficult to debug this outside of your environment. Accordingly, I'd ask you to try a couple of things to help narrow down the issue.

First, could you try a version with just the regular client? I assume it would have lower throughput but it would help eliminate the async handling as the issue.

Second, could you do some profiling with cProfile/memory_profiler? This is not a great article but it should get you started: https://www.kdnuggets.com/introduction-to-memory-profiling-in-python

@genzgd genzgd added the need info Please provide additional information label Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working need info Please provide additional information
Projects
None yet
Development

No branches or pull requests

2 participants