Skip to content

Commit

Permalink
[clickhouse] start moving validation to shared for reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 10, 2024
1 parent a6ae900 commit 9b67801
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 154 deletions.
164 changes: 12 additions & 152 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/x509"
"errors"
"fmt"
"io"
"log/slog"
"maps"
"net/url"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
chvalidate "github.com/PeerDB-io/peer-flow/shared/clickhouse"
)

type ClickHouseConnector struct {
Expand Down Expand Up @@ -270,84 +270,17 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou
return conn, nil
}

// https://github.com/ClickHouse/clickhouse-kafka-connect/blob/2e0c17e2f900d29c00482b9d0a1f55cb678244e5/src/main/java/com/clickhouse/kafka/connect/util/Utils.java#L78-L93
//
//nolint:lll
var retryableExceptions = map[int32]struct{}{
3: {}, // UNEXPECTED_END_OF_FILE
107: {}, // FILE_DOESNT_EXIST
159: {}, // TIMEOUT_EXCEEDED
164: {}, // READONLY
202: {}, // TOO_MANY_SIMULTANEOUS_QUERIES
203: {}, // NO_FREE_CONNECTION
209: {}, // SOCKET_TIMEOUT
210: {}, // NETWORK_ERROR
241: {}, // MEMORY_LIMIT_EXCEEDED
242: {}, // TABLE_IS_READ_ONLY
252: {}, // TOO_MANY_PARTS
285: {}, // TOO_FEW_LIVE_REPLICAS
319: {}, // UNKNOWN_STATUS_OF_INSERT
425: {}, // SYSTEM_ERROR
999: {}, // KEEPER_EXCEPTION
}

func isRetryableException(err error) bool {
if ex, ok := err.(*clickhouse.Exception); ok {
if ex == nil {
return false
}
_, yes := retryableExceptions[ex.Code]
return yes
}
return errors.Is(err, io.EOF)
}

//nolint:unparam
func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...any) error {
var err error
for i := range 5 {
err = c.database.Exec(ctx, query, args...)
if !isRetryableException(err) {
break
}
c.logger.Info("[exec] retryable error", slog.Any("error", err), slog.Any("query", query), slog.Int64("i", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return err
return chvalidate.Exec(ctx, c.logger, c.database, query, args...)
}

func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
var rows driver.Rows
var err error
for i := range 5 {
rows, err = c.database.Query(ctx, query, args...)
if !isRetryableException(err) {
break
}
c.logger.Info("[query] retryable error", slog.Any("error", err), slog.Any("query", query), slog.Int64("i", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return rows, err
return chvalidate.Query(ctx, c.logger, c.database, query, args...)
}

func (c *ClickHouseConnector) queryRow(ctx context.Context, query string, args ...any) driver.Row {
var row driver.Row
for i := range 5 {
row = c.database.QueryRow(ctx, query, args...)
err := row.Err()
if !isRetryableException(err) {
break
}
c.logger.Info("[queryRow] retryable error", slog.Any("error", row.Err()), slog.Any("query", query), slog.Int64("i", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return row
return chvalidate.QueryRow(ctx, c.logger, c.database, query, args...)
}

func (c *ClickHouseConnector) Close() error {
Expand All @@ -367,78 +300,11 @@ func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error {

func (c *ClickHouseConnector) execWithLogging(ctx context.Context, query string) error {
c.logger.Info("[clickhouse] executing DDL statement", slog.String("query", query))
return c.database.Exec(ctx, query)
}

func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tables []string, optedForInitialLoad bool) error {
queryInput := make([]interface{}, 0, len(tables)+1)
queryInput = append(queryInput, c.config.Database)
for _, table := range tables {
queryInput = append(queryInput, table)
}
rows, err := c.query(ctx,
fmt.Sprintf("SELECT name,engine,total_rows FROM system.tables WHERE database=? AND name IN (%s)",
strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...)
if err != nil {
return fmt.Errorf("failed to get information for destination tables: %w", err)
}
defer rows.Close()

for rows.Next() {
var tableName, engine string
var totalRows uint64
if err := rows.Scan(&tableName, &engine, &totalRows); err != nil {
return fmt.Errorf("failed to scan information for tables: %w", err)
}
if totalRows != 0 && optedForInitialLoad {
return fmt.Errorf("table %s exists and is not empty", tableName)
}
if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) {
c.logger.Warn("[clickhouse] table engine not explicitly supported",
slog.String("table", tableName), slog.String("engine", engine))
}
if peerdbenv.PeerDBOnlyClickHouseAllowed() && !strings.HasPrefix(engine, "Shared") {
return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", tableName)
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to read rows: %w", err)
}
return nil
}

func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context,
tables []string,
) (map[string][]*protos.FieldDescription, error) {
tableColumnsMapping := make(map[string][]*protos.FieldDescription, len(tables))
queryInput := make([]interface{}, 0, len(tables)+1)
queryInput = append(queryInput, c.config.Database)
for _, table := range tables {
queryInput = append(queryInput, table)
}
rows, err := c.query(ctx,
fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=? AND table IN (%s)",
strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...)
if err != nil {
return nil, fmt.Errorf("failed to get columns for destination tables: %w", err)
}
defer rows.Close()
for rows.Next() {
var tableName string
var fieldDescription protos.FieldDescription
if err := rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName); err != nil {
return nil, fmt.Errorf("failed to scan columns for tables: %w", err)
}
tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}
return tableColumnsMapping, nil
return c.exec(ctx, query)
}

func (c *ClickHouseConnector) processTableComparison(dstTableName string, srcSchema *protos.TableSchema,
dstSchema []*protos.FieldDescription, peerDBColumns []string, tableMapping *protos.TableMapping,
dstSchema []chvalidate.ClickHouseColumn, peerDBColumns []string, tableMapping *protos.TableMapping,
) error {
for _, srcField := range srcSchema.Columns {
colName := srcField.Name
Expand Down Expand Up @@ -481,15 +347,9 @@ func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *p
tableNameSchemaMapping map[string]*protos.TableSchema,
) error {
if peerdbenv.PeerDBOnlyClickHouseAllowed() {
// this is to indicate ClickHouse Cloud service is now creating tables with Shared* by default
var cloudModeEngine bool
if err := c.queryRow(ctx,
"SELECT value='2' AND changed='1' AND readonly='1' FROM system.settings WHERE name = 'cloud_mode_engine'").
Scan(&cloudModeEngine); err != nil {
return fmt.Errorf("failed to validate cloud_mode_engine setting: %w", err)
}
if !cloudModeEngine {
return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support")
err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database)
if err != nil {
return err
}
}

Expand All @@ -504,13 +364,13 @@ func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *p
// In the case of resync, we don't need to check the content or structure of the original tables;
// they'll anyways get swapped out with the _resync tables which we CREATE OR REPLACE
if !req.Resync {
err := c.checkTablesEmptyAndEngine(ctx, dstTableNames, req.DoInitialSnapshot)
if err != nil {
if err := chvalidate.CheckIfTablesEmptyAndEngine(ctx, c.logger, c.database,
dstTableNames, req.DoInitialSnapshot, peerdbenv.PeerDBOnlyClickHouseAllowed()); err != nil {
return err
}
}
// optimization: fetching columns for all tables at once
chTableColumnsMapping, err := c.getTableColumnsMapping(ctx, dstTableNames)
chTableColumnsMapping, err := chvalidate.GetTableColumnsMapping(ctx, c.logger, c.database, dstTableNames)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const (
versionColType = "Int64"
)

var acceptableTableEngines = []string{"ReplacingMergeTree", "MergeTree"}

func (c *ClickHouseConnector) StartSetupNormalizedTables(_ context.Context) (interface{}, error) {
return nil, nil
}
Expand Down
98 changes: 98 additions & 0 deletions flow/shared/clickhouse/query_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package clickhouse

import (
"context"
"errors"
"io"
"log/slog"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"go.temporal.io/sdk/log"
)

// https://github.com/ClickHouse/clickhouse-kafka-connect/blob/2e0c17e2f900d29c00482b9d0a1f55cb678244e5/src/main/java/com/clickhouse/kafka/connect/util/Utils.java#L78-L93
//
//nolint:lll
var retryableExceptions = map[int32]struct{}{
3: {}, // UNEXPECTED_END_OF_FILE
107: {}, // FILE_DOESNT_EXIST
159: {}, // TIMEOUT_EXCEEDED
164: {}, // READONLY
202: {}, // TOO_MANY_SIMULTANEOUS_QUERIES
203: {}, // NO_FREE_CONNECTION
209: {}, // SOCKET_TIMEOUT
210: {}, // NETWORK_ERROR
241: {}, // MEMORY_LIMIT_EXCEEDED
242: {}, // TABLE_IS_READ_ONLY
252: {}, // TOO_MANY_PARTS
285: {}, // TOO_FEW_LIVE_REPLICAS
319: {}, // UNKNOWN_STATUS_OF_INSERT
425: {}, // SYSTEM_ERROR
999: {}, // KEEPER_EXCEPTION
}

func isRetryableException(err error) bool {
if ex, ok := err.(*clickhouse.Exception); ok {
if ex == nil {
return false
}
_, yes := retryableExceptions[ex.Code]
return yes
}
return errors.Is(err, io.EOF)
}

func Exec(ctx context.Context, logger log.Logger,
conn clickhouse.Conn, query string, args ...any,
) error {
var err error
for i := range 5 {
err = conn.Exec(ctx, query, args...)
if !isRetryableException(err) {
break
}
logger.Info("[exec] retryable error", slog.Any("error", err), slog.String("query", query), slog.Int64("retry", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return err
}

func Query(ctx context.Context, logger log.Logger,
conn clickhouse.Conn, query string, args ...any,
) (driver.Rows, error) {
var rows driver.Rows
var err error
for i := range 5 {
rows, err = conn.Query(ctx, query, args...)
if !isRetryableException(err) {
break
}
logger.Info("[query] retryable error", slog.Any("error", err), slog.String("query", query), slog.Int64("retry", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return rows, err
}

func QueryRow(ctx context.Context, logger log.Logger,
conn clickhouse.Conn, query string, args ...any,
) driver.Row {
var row driver.Row
for i := range 5 {
row = conn.QueryRow(ctx, query, args...)
err := row.Err()
if !isRetryableException(err) {
break
}
logger.Info("[queryRow] retryable error", slog.Any("error", err), slog.String("query", query), slog.Int64("retry", int64(i)))
if i < 4 {
time.Sleep(time.Second * time.Duration(i*5+1))
}
}
return row
}
Loading

0 comments on commit 9b67801

Please sign in to comment.