From 8c312e3154d6fc4ea2f40b1490cd29c64bca8f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 9 Dec 2024 17:19:06 +0000 Subject: [PATCH 1/3] wip --- flow/connectors/clickhouse/clickhouse.go | 61 ++++++++++++++++++++++++ flow/connectors/core.go | 1 + 2 files changed, 62 insertions(+) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index bf7e1b4c9..8e43dbfb1 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -23,6 +23,7 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -542,3 +543,63 @@ func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) { c.logger.Info("[clickhouse] version", slog.Any("version", clickhouseVersion.DisplayName)) return clickhouseVersion.Version.String(), nil } + +func (c *ClickHouseConnector) getTableSchemaForTable(ctx context.Context, tableName string) (*protos.TableSchema, error) { + // TODO sanitize + q, err := c.database.Query(ctx, fmt.Sprintf("select * from %s limit 0", tableName)) + if err != nil { + return nil, err + } + + columns := q.ColumnTypes() + colFields := make([]*protos.FieldDescription, 0, len(columns)) + for _, column := range columns { + var qkind qvalue.QValueKind + switch column.DatabaseTypeName() { + case "String", "Nullable(String)": + qkind = qvalue.QValueKindString + case "Int32", "Nullable(Int32)": + qkind = qvalue.QValueKindInt32 + case "DateTime64(6)", "Nullable(DateTime64(6))": + qkind = qvalue.QValueKindTimestamp + case "Date32", "Nullable(Date32)": + qkind = qvalue.QValueKindDate + default: + if strings.Contains(column.DatabaseTypeName(), "Decimal") { + qkind = qvalue.QValueKindNumeric + } else { + return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.DatabaseTypeName()) + } + } + + colFields = append(colFields, &protos.FieldDescription{ + Name: column.Name(), + Type: string(qkind), + TypeModifier: -1, + }) + } + + return &protos.TableSchema{ + TableIdentifier: tableName, + Columns: colFields, + System: protos.TypeSystem_Q, + }, nil +} + +func (c *ClickHouseConnector) GetTableSchema( + ctx context.Context, + _env map[string]string, + _system protos.TypeSystem, + tableIdentifiers []string, +) (map[string]*protos.TableSchema, error) { + res := make(map[string]*protos.TableSchema, len(tableIdentifiers)) + for _, tableName := range tableIdentifiers { + tableSchema, err := c.getTableSchemaForTable(ctx, tableName) + if err != nil { + return nil, err + } + res[tableName] = tableSchema + } + + return res, nil +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 0991a5097..afdf24494 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -470,6 +470,7 @@ var ( _ GetTableSchemaConnector = &connpostgres.PostgresConnector{} _ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{} + _ GetTableSchemaConnector = &connclickhouse.ClickHouseConnector{} _ NormalizedTablesConnector = &connpostgres.PostgresConnector{} _ NormalizedTablesConnector = &connbigquery.BigQueryConnector{} From a1b29ccc2963d54d8da4c1c24fa5f642d78fda28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 9 Dec 2024 18:39:12 +0000 Subject: [PATCH 2/3] don't duplicate mapping ch ColumnType to QValueKind --- flow/connectors/clickhouse/clickhouse.go | 18 ++++++------- flow/e2e/clickhouse/clickhouse.go | 32 ++++++++---------------- 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 8e43dbfb1..432a650b4 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -544,14 +544,7 @@ func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) { return clickhouseVersion.Version.String(), nil } -func (c *ClickHouseConnector) getTableSchemaForTable(ctx context.Context, tableName string) (*protos.TableSchema, error) { - // TODO sanitize - q, err := c.database.Query(ctx, fmt.Sprintf("select * from %s limit 0", tableName)) - if err != nil { - return nil, err - } - - columns := q.ColumnTypes() +func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*protos.TableSchema, error) { colFields := make([]*protos.FieldDescription, 0, len(columns)) for _, column := range columns { var qkind qvalue.QValueKind @@ -576,6 +569,7 @@ func (c *ClickHouseConnector) getTableSchemaForTable(ctx context.Context, tableN Name: column.Name(), Type: string(qkind), TypeModifier: -1, + Nullable: column.Nullable(), }) } @@ -594,7 +588,13 @@ func (c *ClickHouseConnector) GetTableSchema( ) (map[string]*protos.TableSchema, error) { res := make(map[string]*protos.TableSchema, len(tableIdentifiers)) for _, tableName := range tableIdentifiers { - tableSchema, err := c.getTableSchemaForTable(ctx, tableName) + rows, err := c.database.Query(ctx, fmt.Sprintf("select * from %s limit 0", tableName)) + if err != nil { + return nil, err + } + + tableSchema, err := GetTableSchemaForTable(tableName, rows.ColumnTypes()) + rows.Close() if err != nil { return nil, err } diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 975676152..a36a5335b 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -96,6 +96,7 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch if err != nil { return nil, err } + defer ch.Close() rows, err := ch.Query( context.Background(), @@ -104,36 +105,25 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch if err != nil { return nil, err } + defer rows.Close() batch := &model.QRecordBatch{} types := rows.ColumnTypes() row := make([]interface{}, 0, len(types)) - for _, ty := range types { - nullable := ty.Nullable() + tableSchema, err := connclickhouse.GetTableSchemaForTable(table, types) + if err != nil { + return nil, err + } + + for idx, ty := range types { + fieldDesc := tableSchema.Columns[idx] row = append(row, reflect.New(ty.ScanType()).Interface()) - var qkind qvalue.QValueKind - switch ty.DatabaseTypeName() { - case "String", "Nullable(String)": - qkind = qvalue.QValueKindString - case "Int32", "Nullable(Int32)": - qkind = qvalue.QValueKindInt32 - case "DateTime64(6)", "Nullable(DateTime64(6))": - qkind = qvalue.QValueKindTimestamp - case "Date32", "Nullable(Date32)": - qkind = qvalue.QValueKindDate - default: - if strings.Contains(ty.DatabaseTypeName(), "Decimal") { - qkind = qvalue.QValueKindNumeric - } else { - return nil, fmt.Errorf("failed to resolve QValueKind for %s", ty.DatabaseTypeName()) - } - } batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ Name: ty.Name(), - Type: qkind, + Type: qvalue.QValueKind(fieldDesc.Type), Precision: 0, Scale: 0, - Nullable: nullable, + Nullable: fieldDesc.Nullable, }) } From 13502999741c207b2069a6046ed83ab72f78db70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 9 Dec 2024 18:56:38 +0000 Subject: [PATCH 3/3] more types --- flow/connectors/clickhouse/clickhouse.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 432a650b4..d99462042 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -551,8 +551,16 @@ func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*pro switch column.DatabaseTypeName() { case "String", "Nullable(String)": qkind = qvalue.QValueKindString + case "Bool", "Nullable(Bool)": + qkind = qvalue.QValueKindBoolean + case "Int16", "Nullable(Int16)": + qkind = qvalue.QValueKindInt16 case "Int32", "Nullable(Int32)": qkind = qvalue.QValueKindInt32 + case "Int64", "Nullable(Int64)": + qkind = qvalue.QValueKindInt64 + case "UUID", "Nullable(UUID)": + qkind = qvalue.QValueKindUUID case "DateTime64(6)", "Nullable(DateTime64(6))": qkind = qvalue.QValueKindTimestamp case "Date32", "Nullable(Date32)":