Skip to content

Commit

Permalink
ch: implement GetTableSchemaConnector (#2337)
Browse files Browse the repository at this point in the history
for generic schema changes e2e, alternative to #2114
  • Loading branch information
serprex authored Dec 9, 2024
1 parent d0398f3 commit 2b96245
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 21 deletions.
69 changes: 69 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -542,3 +543,71 @@ func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) {
c.logger.Info("[clickhouse] version", slog.Any("version", clickhouseVersion.DisplayName))
return clickhouseVersion.Version.String(), nil
}

func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*protos.TableSchema, error) {
colFields := make([]*protos.FieldDescription, 0, len(columns))
for _, column := range columns {
var qkind qvalue.QValueKind
switch column.DatabaseTypeName() {
case "String", "Nullable(String)":
qkind = qvalue.QValueKindString
case "Bool", "Nullable(Bool)":
qkind = qvalue.QValueKindBoolean
case "Int16", "Nullable(Int16)":
qkind = qvalue.QValueKindInt16
case "Int32", "Nullable(Int32)":
qkind = qvalue.QValueKindInt32
case "Int64", "Nullable(Int64)":
qkind = qvalue.QValueKindInt64
case "UUID", "Nullable(UUID)":
qkind = qvalue.QValueKindUUID
case "DateTime64(6)", "Nullable(DateTime64(6))":
qkind = qvalue.QValueKindTimestamp
case "Date32", "Nullable(Date32)":
qkind = qvalue.QValueKindDate
default:
if strings.Contains(column.DatabaseTypeName(), "Decimal") {
qkind = qvalue.QValueKindNumeric
} else {
return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.DatabaseTypeName())
}
}

colFields = append(colFields, &protos.FieldDescription{
Name: column.Name(),
Type: string(qkind),
TypeModifier: -1,
Nullable: column.Nullable(),
})
}

return &protos.TableSchema{
TableIdentifier: tableName,
Columns: colFields,
System: protos.TypeSystem_Q,
}, nil
}

func (c *ClickHouseConnector) GetTableSchema(
ctx context.Context,
_env map[string]string,
_system protos.TypeSystem,
tableIdentifiers []string,
) (map[string]*protos.TableSchema, error) {
res := make(map[string]*protos.TableSchema, len(tableIdentifiers))
for _, tableName := range tableIdentifiers {
rows, err := c.database.Query(ctx, fmt.Sprintf("select * from %s limit 0", tableName))
if err != nil {
return nil, err
}

tableSchema, err := GetTableSchemaForTable(tableName, rows.ColumnTypes())
rows.Close()
if err != nil {
return nil, err
}
res[tableName] = tableSchema
}

return res, nil
}
1 change: 1 addition & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ var (

_ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
_ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
_ GetTableSchemaConnector = &connclickhouse.ClickHouseConnector{}

_ NormalizedTablesConnector = &connpostgres.PostgresConnector{}
_ NormalizedTablesConnector = &connbigquery.BigQueryConnector{}
Expand Down
32 changes: 11 additions & 21 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
if err != nil {
return nil, err
}
defer ch.Close()

rows, err := ch.Query(
context.Background(),
Expand All @@ -104,36 +105,25 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
if err != nil {
return nil, err
}
defer rows.Close()

batch := &model.QRecordBatch{}
types := rows.ColumnTypes()
row := make([]interface{}, 0, len(types))
for _, ty := range types {
nullable := ty.Nullable()
tableSchema, err := connclickhouse.GetTableSchemaForTable(table, types)
if err != nil {
return nil, err
}

for idx, ty := range types {
fieldDesc := tableSchema.Columns[idx]
row = append(row, reflect.New(ty.ScanType()).Interface())
var qkind qvalue.QValueKind
switch ty.DatabaseTypeName() {
case "String", "Nullable(String)":
qkind = qvalue.QValueKindString
case "Int32", "Nullable(Int32)":
qkind = qvalue.QValueKindInt32
case "DateTime64(6)", "Nullable(DateTime64(6))":
qkind = qvalue.QValueKindTimestamp
case "Date32", "Nullable(Date32)":
qkind = qvalue.QValueKindDate
default:
if strings.Contains(ty.DatabaseTypeName(), "Decimal") {
qkind = qvalue.QValueKindNumeric
} else {
return nil, fmt.Errorf("failed to resolve QValueKind for %s", ty.DatabaseTypeName())
}
}
batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{
Name: ty.Name(),
Type: qkind,
Type: qvalue.QValueKind(fieldDesc.Type),
Precision: 0,
Scale: 0,
Nullable: nullable,
Nullable: fieldDesc.Nullable,
})
}

Expand Down

0 comments on commit 2b96245

Please sign in to comment.