From decf5ec7ae0bae332225402b6087c907b9f33f27 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Wed, 30 Oct 2024 10:59:01 +0800 Subject: [PATCH] add write benchmark (#198) * add write benchmark * add write benchmark * add commit statement to sqlite * add async test * add wal disabled test --- bindings/python/pyproject.toml | 1 + bindings/python/tests/conftest.py | 34 +++ .../tests/test_write_async_benchmark.py | 210 ++++++++++++++++++ bindings/python/tests/test_write_benchmark.py | 124 +++++++++++ 4 files changed, 369 insertions(+) create mode 100644 bindings/python/tests/conftest.py create mode 100644 bindings/python/tests/test_write_async_benchmark.py create mode 100644 bindings/python/tests/test_write_benchmark.py diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index 795ca0d..5f3d20d 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -15,6 +15,7 @@ dynamic = ["version"] [project.optional-dependencies] test = ["pytest", "pytest-asyncio"] +bench = ["pytest", "pytest-asyncio", "pytest-benchmark", "duckdb"] docs = ["pdoc"] [tool.maturin] diff --git a/bindings/python/tests/conftest.py b/bindings/python/tests/conftest.py new file mode 100644 index 0000000..d793f88 --- /dev/null +++ b/bindings/python/tests/conftest.py @@ -0,0 +1,34 @@ +import asyncio +import random + +import pytest_asyncio + + +def gen_string(max_size): + size = gen_int(0, max_size) + charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + return ''.join(random.choices(charset, k=size)) + + +def gen_bytes(max_size): + return gen_string(max_size).encode("utf-8") + + +def gen_int(lower, high): + return random.randint(lower, high) + + +# async support for pytest-benchmark +# https://github.com/ionelmc/pytest-benchmark/issues/66#issuecomment-1137005280 +@pytest_asyncio.fixture +def aio_benchmark(benchmark, event_loop): + def _wrapper(func, *args, **kwargs): + if asyncio.iscoroutinefunction(func): + + @benchmark + def _(): + return event_loop.run_until_complete(func(*args, **kwargs)) + else: + benchmark(func, *args, **kwargs) + + return _wrapper diff --git a/bindings/python/tests/test_write_async_benchmark.py b/bindings/python/tests/test_write_async_benchmark.py new file mode 100644 index 0000000..1dd4191 --- /dev/null +++ b/bindings/python/tests/test_write_async_benchmark.py @@ -0,0 +1,210 @@ +import asyncio +import os +import tempfile +import duckdb +import sqlite3 +import pytest + +from conftest import gen_int, gen_string, gen_bytes +from tonbo import Record, Column, DataType, TonboDB, DbOption +from tonbo.fs import from_filesystem_path + +WRITE_TIME = 500000 + + +@Record +class User: + id = Column(DataType.Int64, name="id", primary_key=True) + name = Column(DataType.String, name="name") + email = Column(DataType.String, name="email", nullable=True) + age = Column(DataType.UInt16, name="age") + data = Column(DataType.Bytes, name="data") + + +async def duckdb_write(threads: int): + con = duckdb.connect(config={"threads": threads}) + con.sql( + "CREATE TABLE user (id INTEGER, name VARCHAR(20), email VARCHAR(20), age INTEGER, data VARCHAR(200))" + ) + + async def insert_task(start: int, end: int): + txn = con.begin() + for j in range(start, end): + txn.execute( + "INSERT INTO user VALUES (?, ?, ?, ?, ?)", + [i, gen_string(20), gen_string(20), gen_int(0, 0xffff), gen_bytes(200)], + ) + txn.commit() + + tasks = [] + for i in range(0, threads): + tasks.append(insert_task(i * WRITE_TIME // threads, (i + 1) * WRITE_TIME // threads)) + + await asyncio.gather(*tasks) + con.commit() + con.close() + + +async def tonbo_write(threads: int): + temp_dir = tempfile.TemporaryDirectory() + + option = DbOption(from_filesystem_path(temp_dir.name)) + + db = TonboDB(option, User()) + tasks = [] + + async def insert_task(start: int, end: int): + txn = await db.transaction() + for j in range(start, end): + txn.insert( + User( + id=j, + age=gen_int(0, 0xffff), + name=gen_string(20), + email=gen_string(20), + data=gen_bytes(200), + ) + ) + await txn.commit() + + for i in range(0, threads): + tasks.append(insert_task(i * WRITE_TIME // threads, (i + 1) * WRITE_TIME // threads)) + + await asyncio.gather(*tasks) + await db.flush_wal() + + +async def tonbo_write_disable_wal(threads: int): + temp_dir = tempfile.TemporaryDirectory() + + option = DbOption(from_filesystem_path(temp_dir.name)) + option.use_wal = False + + db = TonboDB(option, User()) + tasks = [] + + async def insert_task(start: int, end: int): + txn = await db.transaction() + for j in range(start, end): + txn.insert( + User( + id=j, + age=gen_int(0, 0xffff), + name=gen_string(20), + email=gen_string(20), + data=gen_bytes(200), + ) + ) + await txn.commit() + + for i in range(0, threads): + tasks.append(insert_task(i * WRITE_TIME // threads, (i + 1) * WRITE_TIME // threads)) + + await asyncio.gather(*tasks) + + +async def sqlite_write(threads: int): + file = tempfile.NamedTemporaryFile() + con = sqlite3.connect(file.name, check_same_thread=False) + con.execute( + "CREATE TABLE user (id INTEGER, name VARCHAR(20), email VARCHAR(20), age INTEGER, data VARCHAR(200))" + ) + + async def insert_task(start: int, end: int): + for j in range(start, end): + con.execute( + "INSERT INTO user VALUES (?, ?, ?, ?, ?)", + [j, gen_string(20), gen_string(20), gen_int(0, 0xffff), gen_bytes(200)], + ) + con.commit() + + tasks = [] + for i in range(0, threads): + tasks.append(insert_task(i * WRITE_TIME // threads, (i + 1) * WRITE_TIME // threads)) + await asyncio.gather(*tasks) + con.commit() + con.close() + + +@pytest.mark.parametrize("threads", [1]) +@pytest.mark.benchmark(group="1 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_duckdb_one_task(aio_benchmark, threads): + aio_benchmark(duckdb_write, threads) + + +@pytest.mark.parametrize("threads", [1]) +@pytest.mark.benchmark(group="1 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_tonbo_one_task(aio_benchmark, threads): + aio_benchmark(tonbo_write, threads) + + +@pytest.mark.parametrize("threads", [1]) +@pytest.mark.benchmark(group="1 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_tonbo_disable_wal_one_task(aio_benchmark, threads): + aio_benchmark(tonbo_write_disable_wal, threads) + + +@pytest.mark.parametrize("threads", [1]) +@pytest.mark.benchmark(group="1 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_sqlite_one_task(aio_benchmark, threads): + aio_benchmark(sqlite_write, threads) + + +@pytest.mark.parametrize("threads", [4]) +@pytest.mark.benchmark(group="4 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_duckdb_four_task(aio_benchmark, threads): + aio_benchmark(duckdb_write, threads) + + +@pytest.mark.parametrize("threads", [4]) +@pytest.mark.benchmark(group="4 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_tonbo_four_task(aio_benchmark, threads): + aio_benchmark(tonbo_write, threads) + + +@pytest.mark.parametrize("threads", [4]) +@pytest.mark.benchmark(group="4 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_tonbo_disable_wal_four_task(aio_benchmark, threads): + aio_benchmark(tonbo_write_disable_wal, threads) + + +@pytest.mark.parametrize("threads", [4]) +@pytest.mark.benchmark(group="4 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_sqlite_four_task(aio_benchmark, threads): + aio_benchmark(sqlite_write, threads) + + +@pytest.mark.parametrize("threads", [8]) +@pytest.mark.benchmark(group="8 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_duckdb_eight_task(aio_benchmark, threads): + aio_benchmark(duckdb_write, threads) + + +@pytest.mark.parametrize("threads", [8]) +@pytest.mark.benchmark(group="8 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_tonbo_eight_task(aio_benchmark, threads): + aio_benchmark(tonbo_write, threads) + + +@pytest.mark.parametrize("threads", [8]) +@pytest.mark.benchmark(group="8 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_tonbo_disable_wal_eight_task(aio_benchmark, threads): + aio_benchmark(tonbo_write_disable_wal, threads) + + +@pytest.mark.parametrize("threads", [8]) +@pytest.mark.benchmark(group="8 async task") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_sqlite_eight_task(aio_benchmark, threads): + aio_benchmark(sqlite_write, threads) diff --git a/bindings/python/tests/test_write_benchmark.py b/bindings/python/tests/test_write_benchmark.py new file mode 100644 index 0000000..0bc85b3 --- /dev/null +++ b/bindings/python/tests/test_write_benchmark.py @@ -0,0 +1,124 @@ +import os +import tempfile +import duckdb +import sqlite3 +import pytest + +from conftest import gen_string, gen_int, gen_bytes +from tonbo import Record, Column, DataType, TonboDB, DbOption +from tonbo.fs import from_filesystem_path + +WRITE_TIME = 500000 + + +@Record +class User: + id = Column(DataType.Int64, name="id", primary_key=True) + name = Column(DataType.String, name="name") + email = Column(DataType.String, name="email", nullable=True) + age = Column(DataType.UInt16, name="age") + data = Column(DataType.Bytes, name="data") + + +def duckdb_write(auto_commit: bool): + con = duckdb.connect() + con.sql( + "CREATE TABLE user (id INTEGER, name VARCHAR(20), email VARCHAR(20), age INTEGER, data VARCHAR(200))" + ) + if not auto_commit: + con.begin() + for i in range(0, WRITE_TIME): + con.execute( + "INSERT INTO user VALUES (?, ?, ?, ?, ?)", + [i, gen_string(20), gen_string(20), gen_int(0, 0xffff), gen_bytes(200)], + ) + if not auto_commit: + con.commit() + con.close() + + +async def tonbo_write(auto_commit: bool): + temp_dir = tempfile.TemporaryDirectory() + + option = DbOption(from_filesystem_path(temp_dir.name)) + + db = TonboDB(option, User()) + if auto_commit: + for i in range(0, WRITE_TIME): + await db.insert(User( + id=i, + age=gen_int(0, 0xffff), + name=gen_string(20), + email=gen_string(20), + data=gen_bytes(200), + )) + else: + txn = await db.transaction() + for i in range(0, WRITE_TIME): + txn.insert(User( + id=i, + age=gen_int(0, 0xffff), + name=gen_string(20), + email=gen_string(20), + data=gen_bytes(200), + )) + await txn.commit() + + await db.flush_wal() + + +def sqlite_write(auto_commit: bool): + file = tempfile.NamedTemporaryFile() + con = sqlite3.connect(file.name, autocommit=auto_commit) + con.execute( + "CREATE TABLE user (id INTEGER, name VARCHAR(20), email VARCHAR(20), age INTEGER, data VARCHAR(200))" + ) + for i in range(0, WRITE_TIME): + con.execute( + "INSERT INTO user VALUES (?, ?, ?, ?, ?)", + [i, gen_string(20), gen_string(20), gen_int(0, 0xffff), gen_bytes(200)], + ) + con.commit() + con.close() + + +@pytest.mark.parametrize("auto_commit", [True]) +@pytest.mark.benchmark(group="autocommit") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_duckdb_autocommit(benchmark, auto_commit): + benchmark(duckdb_write, auto_commit) + + +@pytest.mark.parametrize("auto_commit", [False]) +@pytest.mark.benchmark(group="txn") +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +def test_duckdb(benchmark, auto_commit): + benchmark(duckdb_write, auto_commit) + + +@pytest.mark.parametrize("auto_commit", [False]) +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +@pytest.mark.benchmark(group="txn") +def test_tonbo(aio_benchmark, auto_commit): + aio_benchmark(tonbo_write, auto_commit) + + +@pytest.mark.parametrize("auto_commit", [True]) +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +@pytest.mark.benchmark(group="autocommit") +def test_tonbo_no_txn(aio_benchmark, auto_commit): + aio_benchmark(tonbo_write, auto_commit) + + +@pytest.mark.parametrize("auto_commit", [True]) +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +@pytest.mark.benchmark(group="autocommit") +def test_sqlite_autocommit(benchmark, auto_commit): + benchmark(sqlite_write, auto_commit) + + +@pytest.mark.parametrize("auto_commit", [False]) +@pytest.mark.skipif("BENCH" not in os.environ, reason="benchmark") +@pytest.mark.benchmark(group="txn") +def test_sqlite(benchmark, auto_commit): + benchmark(sqlite_write, auto_commit)