Skip to content

Commit

Permalink
drafts on a plane
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 11, 2024
1 parent cb79f49 commit 8297d83
Show file tree
Hide file tree
Showing 8 changed files with 348 additions and 279 deletions.
32 changes: 18 additions & 14 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package connclickhouse

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"strings"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/ch-go"
chproto "github.com/ClickHouse/ch-go/proto"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -19,7 +17,7 @@ import (
)

const (
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;`
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;`
dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;`
)

Expand All @@ -29,17 +27,23 @@ func (c *ClickHouseConnector) getRawTableName(flowJobName string) string {
}

func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) {
var result sql.NullInt32
err := c.queryRow(ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result)
if err != nil {
// TODO escape
var existsC chproto.ColBool
if err := c.query(ctx, ch.Query{
Body: fmt.Sprintf(checkIfTableExistsSQL, "'"+databaseName+"'", "'"+tableIdentifier+"'"),
Result: chproto.Results{
{Name: "table_exists", Data: &existsC},
},
OnResult: func(ctx context.Context, block chproto.Block) error {
if block.Rows != 1 {
return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows)
}
return nil
},
}); err != nil {
return false, fmt.Errorf("error while reading result row: %w", err)
}

if !result.Valid {
return false, errors.New("[clickhouse] checkIfTableExists: result is not valid")
}

return result.Int32 == 1, nil
return existsC[0], nil
}

func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
Expand Down
195 changes: 83 additions & 112 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/ch-go"
chproto "github.com/ClickHouse/ch-go/proto"
"github.com/aws/aws-sdk-go-v2/aws"
"go.temporal.io/sdk/log"
"golang.org/x/mod/semver"
Expand All @@ -30,7 +30,7 @@ import (

type ClickHouseConnector struct {
*metadataStore.PostgresMetadata
database clickhouse.Conn
database *ch.Client
logger log.Logger
config *protos.ClickhouseConfig
credsProvider *utils.ClickHouseS3Credentials
Expand Down Expand Up @@ -183,22 +183,12 @@ func NewClickHouseConnector(
// This is the minimum version of ClickHouse that actually supports session token
// https://github.com/ClickHouse/ClickHouse/issues/61230
minSupportedClickHouseVersion := "v24.3.1"
clickHouseVersionRow := database.QueryRow(ctx, "SELECT version()")
var clickHouseVersion string
err := clickHouseVersionRow.Scan(&clickHouseVersion)
if err != nil {
return nil, fmt.Errorf("failed to query ClickHouse version: %w", err)
}
// Ignore everything after patch version and prefix with "v", else semver.Compare will fail
versionParts := strings.SplitN(clickHouseVersion, ".", 4)
if len(versionParts) > 3 {
versionParts = versionParts[:3]
}
cleanedClickHouseVersion := "v" + strings.Join(versionParts, ".")
serverInfo := database.ServerInfo()
cleanedClickHouseVersion := fmt.Sprintf("v%d.%d.%d", serverInfo.Major, serverInfo.Minor, serverInfo.Revision)
if semver.Compare(cleanedClickHouseVersion, minSupportedClickHouseVersion) < 0 {
return nil, fmt.Errorf(
"provide S3 Transient Stage details explicitly or upgrade to ClickHouse version >= %v, current version is %s. %s",
minSupportedClickHouseVersion, clickHouseVersion,
minSupportedClickHouseVersion, cleanedClickHouseVersion,
"You can also contact PeerDB support for implicit S3 stage setup for older versions of ClickHouse.")
}
}
Expand All @@ -213,7 +203,7 @@ func NewClickHouseConnector(
}, nil
}

func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
func Connect(ctx context.Context, config *protos.ClickhouseConfig) (*ch.Client, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
Expand All @@ -236,23 +226,14 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
tlsSetting.RootCAs = caPool
}

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
Auth: clickhouse.Auth{
Database: config.Database,
Username: config.User,
Password: config.Password,
},
conn, err := ch.Dial(ctx, ch.Options{
Address: fmt.Sprintf("%s:%d", config.Host, config.Port),
Database: config.Database,
User: config.User,
Password: config.Password,
TLS: tlsSetting,
Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4},
ClientInfo: clickhouse.ClientInfo{
Products: []struct {
Name string
Version string
}{
{Name: "peerdb"},
},
},
Compression: ch.CompressionLZ4,
ClientName: "peerdb",
DialTimeout: 3600 * time.Second,
ReadTimeout: 3600 * time.Second,
})
Expand Down Expand Up @@ -290,21 +271,20 @@ var retryableExceptions = map[int32]struct{}{
}

func isRetryableException(err error) bool {
if ex, ok := err.(*clickhouse.Exception); ok {
if ex, ok := err.(*ch.Exception); ok {
if ex == nil {
return false
}
_, yes := retryableExceptions[ex.Code]
_, yes := retryableExceptions[int32(ex.Code)]
return yes
}
return errors.Is(err, io.EOF)
}

//nolint:unparam
func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...any) error {
func (c *ClickHouseConnector) exec(ctx context.Context, query string) error {
var err error
for i := range 5 {
err = c.database.Exec(ctx, query, args...)
err = c.database.Do(ctx, ch.Query{Body: query})
if !isRetryableException(err) {
break
}
Expand All @@ -316,11 +296,10 @@ func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...an
return err
}

func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
var rows driver.Rows
func (c *ClickHouseConnector) query(ctx context.Context, query ch.Query) error {
var err error
for i := range 5 {
rows, err = c.database.Query(ctx, query, args...)
err = c.database.Do(ctx, query)
if !isRetryableException(err) {
break
}
Expand All @@ -329,23 +308,7 @@ func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...a
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return rows, err
}

func (c *ClickHouseConnector) queryRow(ctx context.Context, query string, args ...any) driver.Row {
var row driver.Row
for i := range 5 {
row = c.database.QueryRow(ctx, query, args...)
err := row.Err()
if !isRetryableException(err) {
break
}
c.logger.Info("[queryRow] retryable error", slog.Any("error", row.Err()), slog.Any("query", query), slog.Int64("i", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return row
return err
}

func (c *ClickHouseConnector) Close() error {
Expand All @@ -365,72 +328,80 @@ func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error {

func (c *ClickHouseConnector) execWithLogging(ctx context.Context, query string) error {
c.logger.Info("[clickhouse] executing DDL statement", slog.String("query", query))
return c.database.Exec(ctx, query)
return c.exec(ctx, query)
}

func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tables []string) error {
queryInput := make([]interface{}, 0, len(tables)+1)
queryInput = append(queryInput, c.config.Database)
escapedTables := make([]string, 0, len(tables))
for _, table := range tables {
queryInput = append(queryInput, table)
}
rows, err := c.query(ctx,
fmt.Sprintf("SELECT name,engine,total_rows FROM system.tables WHERE database=? AND table IN (%s)",
strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...)
if err != nil {
// TODO proper
escapedTables = append(escapedTables, "'"+table+"'")
}
var nameC chproto.ColStr
var engineC chproto.ColStr
var totalRowsC chproto.ColUInt64
if err := c.query(ctx, ch.Query{
Body: fmt.Sprintf(
"SELECT name,engine,total_rows FROM system.tables WHERE database='%s' AND table IN (%s)",
c.config.Database, strings.Join(escapedTables, ",")),
Result: chproto.Results{
{Name: "name", Data: &nameC},
{Name: "engine", Data: &engineC},
{Name: "total_rows", Data: &totalRowsC},
},
OnResult: func(ctx context.Context, block chproto.Block) error {
for idx := range block.Rows {
name := nameC.Row(idx)
engine := engineC.Row(idx)
totalRows := totalRowsC[idx]
if totalRows != 0 {
return fmt.Errorf("table %s exists and is not empty", name)
}
if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) {
c.logger.Warn("[clickhouse] table engine not explicitly supported",
slog.String("table", name), slog.String("engine", engine))
}
}
return nil
},
}); err != nil {
return fmt.Errorf("failed to get information for destination tables: %w", err)
}
defer rows.Close()

for rows.Next() {
var tableName, engine string
var totalRows uint64
err = rows.Scan(&tableName, &engine, &totalRows)
if err != nil {
return fmt.Errorf("failed to scan information for tables: %w", err)
}
if totalRows != 0 {
return fmt.Errorf("table %s exists and is not empty", tableName)
}
if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) {
c.logger.Warn("[clickhouse] table engine not explicitly supported",
slog.String("table", tableName), slog.String("engine", engine))
}
}
if rows.Err() != nil {
return fmt.Errorf("failed to read rows: %w", rows.Err())
}
return nil
}

func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context,
tables []string,
) (map[string][]*protos.FieldDescription, error) {
tableColumnsMapping := make(map[string][]*protos.FieldDescription, len(tables))
queryInput := make([]interface{}, 0, len(tables)+1)
queryInput = append(queryInput, c.config.Database)
func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, tables []string) (map[string][]*protos.FieldDescription, error) {
escapedTables := make([]string, 0, len(tables))
for _, table := range tables {
queryInput = append(queryInput, table)
}
rows, err := c.query(ctx,
fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=? AND table IN (%s)",
strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...)
if err != nil {
// TODO proper
escapedTables = append(escapedTables, "'"+table+"'")
}
tableColumnsMapping := make(map[string][]*protos.FieldDescription)
var nameC chproto.ColStr
var typeC chproto.ColStr
var tableC chproto.ColStr
if err := c.query(ctx, ch.Query{
Body: fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=%s AND table IN (%s)",
c.config.Database, strings.Join(escapedTables, ",")),
Result: chproto.Results{
{Name: "name", Data: &nameC},
{Name: "type", Data: &typeC},
{Name: "table", Data: &tableC},
},
OnResult: func(ctx context.Context, block chproto.Block) error {
for idx := range block.Rows {
table := tableC.Row(idx)
tableColumnsMapping[table] = append(tableColumnsMapping[table], &protos.FieldDescription{
Name: nameC.Row(idx),
Type: typeC.Row(idx),
})
}
return nil
},
},
); err != nil {
return nil, fmt.Errorf("failed to get columns for destination tables: %w", err)
}
defer rows.Close()
for rows.Next() {
var tableName string
var fieldDescription protos.FieldDescription
err = rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName)
if err != nil {
return nil, fmt.Errorf("failed to scan columns for tables: %w", err)
}
tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription)
}
if rows.Err() != nil {
return nil, fmt.Errorf("failed to read rows: %w", rows.Err())
}
return tableColumnsMapping, nil
}

Expand Down
Loading

0 comments on commit 8297d83

Please sign in to comment.