Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 9, 2024
1 parent d0398f3 commit 8c312e3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
61 changes: 61 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -542,3 +543,63 @@ func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) {
c.logger.Info("[clickhouse] version", slog.Any("version", clickhouseVersion.DisplayName))
return clickhouseVersion.Version.String(), nil
}

func (c *ClickHouseConnector) getTableSchemaForTable(ctx context.Context, tableName string) (*protos.TableSchema, error) {
// TODO sanitize
q, err := c.database.Query(ctx, fmt.Sprintf("select * from %s limit 0", tableName))
if err != nil {
return nil, err
}

columns := q.ColumnTypes()
colFields := make([]*protos.FieldDescription, 0, len(columns))
for _, column := range columns {
var qkind qvalue.QValueKind
switch column.DatabaseTypeName() {
case "String", "Nullable(String)":
qkind = qvalue.QValueKindString
case "Int32", "Nullable(Int32)":
qkind = qvalue.QValueKindInt32
case "DateTime64(6)", "Nullable(DateTime64(6))":
qkind = qvalue.QValueKindTimestamp
case "Date32", "Nullable(Date32)":
qkind = qvalue.QValueKindDate
default:
if strings.Contains(column.DatabaseTypeName(), "Decimal") {
qkind = qvalue.QValueKindNumeric
} else {
return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.DatabaseTypeName())
}
}

colFields = append(colFields, &protos.FieldDescription{
Name: column.Name(),
Type: string(qkind),
TypeModifier: -1,
})
}

return &protos.TableSchema{
TableIdentifier: tableName,
Columns: colFields,
System: protos.TypeSystem_Q,
}, nil
}

func (c *ClickHouseConnector) GetTableSchema(
ctx context.Context,
_env map[string]string,
_system protos.TypeSystem,
tableIdentifiers []string,
) (map[string]*protos.TableSchema, error) {
res := make(map[string]*protos.TableSchema, len(tableIdentifiers))
for _, tableName := range tableIdentifiers {
tableSchema, err := c.getTableSchemaForTable(ctx, tableName)
if err != nil {
return nil, err
}
res[tableName] = tableSchema
}

return res, nil
}
1 change: 1 addition & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ var (

_ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
_ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
_ GetTableSchemaConnector = &connclickhouse.ClickHouseConnector{}

_ NormalizedTablesConnector = &connpostgres.PostgresConnector{}
_ NormalizedTablesConnector = &connbigquery.BigQueryConnector{}
Expand Down

0 comments on commit 8c312e3

Please sign in to comment.