Skip to content

Commit

Permalink
added sink for clickhouse (#298)
Browse files Browse the repository at this point in the history
  • Loading branch information
roopeshsn authored Nov 13, 2023
1 parent 353f500 commit b30151f
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,6 @@ dmypy.json

# local vscode config
.vscode/

# local pycharm config
.idea/
7 changes: 7 additions & 0 deletions buildflow/core/types/clickhouse_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Clickhouse

ClickhouseDatabase = str
ClickhouseTableID = str
ClickhouseHost = str
ClickhouseUsername = str
ClickhousePassword = str
2 changes: 2 additions & 0 deletions buildflow/io/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ruff: noqa
from .clickhouse import ClickhouseTable
65 changes: 65 additions & 0 deletions buildflow/io/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import dataclasses
from typing import Optional, Type

from buildflow.config.cloud_provider_config import LocalOptions
from buildflow.core.credentials import EmptyCredentials
from buildflow.core.types.clickhouse_types import (
ClickhouseDatabase,
ClickhouseHost,
ClickhousePassword,
ClickhouseTableID,
ClickhouseUsername,
)
from buildflow.io.clickhouse.strategies.clickhouse_strategies import ClickhouseSink
from buildflow.io.primitive import LocalPrimtive, Primitive

_DEFAULT_DESTROY_PROTECTION = False
_DEFAULT_BATCH_SIZE = 10_000


@dataclasses.dataclass
class ClickhouseTable(LocalPrimtive):
database: ClickhouseDatabase
table: ClickhouseTableID
schema: Optional[Type] = dataclasses.field(default=None, init=False)
# Arguments for authentication
host: ClickhouseHost
username: ClickhouseUsername
password: ClickhousePassword

def options(
self,
# Pulumi management options
destroy_protection: bool = _DEFAULT_DESTROY_PROTECTION,
schema: Optional[Type] = None,
# Sink options
batch_size: int = _DEFAULT_BATCH_SIZE,
) -> Primitive:
self.destroy_protection = destroy_protection
self.batch_size = batch_size
self.schema = schema
return self

@classmethod
def from_local_options(
cls,
local_options: LocalOptions,
*,
host: ClickhouseHost,
username: ClickhouseUsername,
password: ClickhousePassword,
database: ClickhouseDatabase,
table: ClickhouseTableID,
) -> "LocalPrimtive":
"""Create a primitive from LocalOptions."""
return cls(host, username, password, database, table)

def sink(self, credentials: EmptyCredentials):
return ClickhouseSink(
credentials=credentials,
host=self.host,
username=self.username,
password=self.password,
database=self.database,
table=self.table,
)
Empty file.
83 changes: 83 additions & 0 deletions buildflow/io/clickhouse/strategies/clickhouse_strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import asyncio
import logging
from typing import Any, Callable, Dict, Iterable, Type

import clickhouse_connect
import pandas as pd

from buildflow.core.credentials import EmptyCredentials
from buildflow.core.types.clickhouse_types import (
ClickhouseDatabase,
ClickhouseHost,
ClickhousePassword,
ClickhouseTableID,
ClickhouseUsername,
)
from buildflow.io.strategies.sink import SinkStrategy
from buildflow.io.utils.schemas import converters

_MAX_CONNECT_TRIES = 25


class ClickhouseSink(SinkStrategy):
def __init__(
self,
*,
credentials: EmptyCredentials,
host: ClickhouseHost,
username: ClickhouseUsername,
password: ClickhousePassword,
database: ClickhouseDatabase,
table: ClickhouseTableID,
):
super().__init__(credentials=credentials, strategy_id="clickhouse-sink")
self.client = None
self.database = database
self.table = table
self.host = host
self.username = username
self.password = password

async def connect(self):
connect_tries = 0
while connect_tries < _MAX_CONNECT_TRIES:
try:
self.client = clickhouse_connect.get_client(
host=self.host,
username=self.username,
password=self.password,
)
self.client.command(
f"CREATE DATABASE IF NOT EXISTS {self.database} ENGINE = Memory"
)
self.client.command(f'USE "{self.database}"')
break
except clickhouse_connect.driver.exceptions.Error:
logging.exception("failed to connect to clickhouse database")
connect_tries += 1
if connect_tries == _MAX_CONNECT_TRIES:
raise ValueError(
"failed to connect to clickhouse database. did you leave a "
"connection open?"
)
else:
logging.warning(
"can't concurrently write to Clickhouse "
"waiting 2 seconds then will "
"try again"
)
await asyncio.sleep(2)

def push_converter(
self, user_defined_type: Type
) -> Callable[[Any], Dict[str, Any]]:
return converters.json_push_converter(user_defined_type)

async def push(self, batch: Iterable[Dict[str, Any]]):
if self.client is None:
await self.connect()
df = pd.DataFrame(batch)
try:
self.client.insert_df(self.table, df)
except clickhouse_connect.driver.exceptions.DataError:
logging.exception("failed to connect to clickhouse database")
62 changes: 62 additions & 0 deletions buildflow/io/clickhouse/strategies/clickhouse_strategies_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import unittest
from unittest.mock import Mock, patch

import clickhouse_connect
import pandas as pd
import pytest

from buildflow.io.clickhouse.strategies import clickhouse_strategies


@pytest.mark.usefixtures("event_loop_instance")
class ClickhouseStrategiesTest(unittest.TestCase):
def get_async_result(self, coro):
"""Run a coroutine synchronously."""
return self.event_loop.run_until_complete(coro)

def tearDown(self) -> None:
try:
os.remove(self.db)
except FileNotFoundError:
pass

def setUp(self) -> None:
self.host = "localhost"
self.username = "default"
self.password = ""
self.db = "default"
self.table = "default"
self.sink = clickhouse_strategies.ClickhouseSink(
credentials=None,
host=self.host,
username=self.username,
password=self.password,
database=self.db,
table=self.table,
)

@patch("clickhouse_connect.get_client")
def test_clickhouse_push_base(self, mock_get_client):
data = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]

# Mocking clickhouse_connect.get_client()
mock_client = Mock()
mock_client.query_df.return_value = pd.DataFrame(data)
mock_client.query_df.to_dict.return_value = data
mock_get_client.return_value = mock_client

# Mocking self.sink.push(data)
with patch.object(self.sink, "push", return_value=None) as mock_push:
self.get_async_result(self.sink.push(data))

client = clickhouse_connect.get_client()
result_df = client.query_df(f'SELECT * from "{self.table}"')
self.assertEqual(result_df.to_dict("records"), data)

# Verify that self.sink.push was called with the expected data
mock_push.assert_called_with(data)


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies = [
"cloud-sql-python-connector",
"dacite",
"duckdb",
"clickhouse-connect",
"gcsfs",
"google-api-python-client",
"google-auth",
Expand Down

0 comments on commit b30151f

Please sign in to comment.