Skip to content

Commit

Permalink
feat(clickhouse): support reading parquet and csv globs
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Aug 21, 2023
1 parent 1bda3bd commit 4ea1834
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import ast
import glob
import json
from contextlib import closing, suppress
from functools import partial
Expand Down Expand Up @@ -548,22 +549,28 @@ def read_parquet(
self,
path: str | Path,
table_name: str | None = None,
engine: str = "File(Parquet)",
engine: str = "File(Native)",
**kwargs: Any,
) -> ir.Table:
import pyarrow.parquet as pq
import pyarrow.dataset as ds
from clickhouse_connect.driver.tools import insert_file

from ibis.formats.pyarrow import PyArrowSchema

schema = PyArrowSchema.to_ibis(pq.read_metadata(path).schema.to_arrow_schema())
paths = list(glob.glob(str(path)))
schema = PyArrowSchema.to_ibis(ds.dataset(paths, format="parquet").schema)

name = table_name or util.gen_name("read_parquet")
table = self.create_table(name, engine=engine, schema=schema, temp=True)

insert_file(
client=self.con, table=name, file_path=str(path), fmt="Parquet", **kwargs
)
for file_path in paths:
insert_file(
client=self.con,
table=name,
file_path=file_path,
fmt="Parquet",
**kwargs,
)
return table

def read_csv(
Expand All @@ -573,18 +580,19 @@ def read_csv(
engine: str = "File(Native)",
**kwargs: Any,
) -> ir.Table:
import pyarrow.csv as pac
import pyarrow.dataset as ds
from clickhouse_connect.driver.tools import insert_file

from ibis.formats.pyarrow import PyArrowSchema

with pac.open_csv(path) as f:
schema = PyArrowSchema.to_ibis(f.schema)
paths = list(glob.glob(str(path)))
schema = PyArrowSchema.to_ibis(ds.dataset(paths, format="csv").schema)

name = table_name or util.gen_name("read_csv")
table = self.create_table(name, engine=engine, schema=schema, temp=True)

insert_file(client=self.con, table=name, file_path=str(path), **kwargs)
for file_path in paths:
insert_file(client=self.con, table=name, file_path=file_path, **kwargs)
return table

def create_table(
Expand Down

0 comments on commit 4ea1834

Please sign in to comment.