diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 42e16b4f4d..de8b2461f3 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -6,7 +6,6 @@ import ( "crypto/x509" "errors" "fmt" - "io" "log/slog" "maps" "net/url" @@ -26,6 +25,7 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" + chvalidate "github.com/PeerDB-io/peer-flow/shared/clickhouse" ) type ClickHouseConnector struct { @@ -270,84 +270,17 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou return conn, nil } -// https://github.com/ClickHouse/clickhouse-kafka-connect/blob/2e0c17e2f900d29c00482b9d0a1f55cb678244e5/src/main/java/com/clickhouse/kafka/connect/util/Utils.java#L78-L93 -// -//nolint:lll -var retryableExceptions = map[int32]struct{}{ - 3: {}, // UNEXPECTED_END_OF_FILE - 107: {}, // FILE_DOESNT_EXIST - 159: {}, // TIMEOUT_EXCEEDED - 164: {}, // READONLY - 202: {}, // TOO_MANY_SIMULTANEOUS_QUERIES - 203: {}, // NO_FREE_CONNECTION - 209: {}, // SOCKET_TIMEOUT - 210: {}, // NETWORK_ERROR - 241: {}, // MEMORY_LIMIT_EXCEEDED - 242: {}, // TABLE_IS_READ_ONLY - 252: {}, // TOO_MANY_PARTS - 285: {}, // TOO_FEW_LIVE_REPLICAS - 319: {}, // UNKNOWN_STATUS_OF_INSERT - 425: {}, // SYSTEM_ERROR - 999: {}, // KEEPER_EXCEPTION -} - -func isRetryableException(err error) bool { - if ex, ok := err.(*clickhouse.Exception); ok { - if ex == nil { - return false - } - _, yes := retryableExceptions[ex.Code] - return yes - } - return errors.Is(err, io.EOF) -} - //nolint:unparam func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...any) error { - var err error - for i := range 5 { - err = c.database.Exec(ctx, query, args...) - if !isRetryableException(err) { - break - } - c.logger.Info("[exec] retryable error", slog.Any("error", err), slog.Any("query", query), slog.Int64("i", int64(i))) - if i < 4 { - time.Sleep(time.Second * time.Duration(i*5+1)) - } - } - return err + return chvalidate.Exec(ctx, c.logger, c.database, query, args...) } func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...any) (driver.Rows, error) { - var rows driver.Rows - var err error - for i := range 5 { - rows, err = c.database.Query(ctx, query, args...) - if !isRetryableException(err) { - break - } - c.logger.Info("[query] retryable error", slog.Any("error", err), slog.Any("query", query), slog.Int64("i", int64(i))) - if i < 4 { - time.Sleep(time.Second * time.Duration(i*5+1)) - } - } - return rows, err + return chvalidate.Query(ctx, c.logger, c.database, query, args...) } func (c *ClickHouseConnector) queryRow(ctx context.Context, query string, args ...any) driver.Row { - var row driver.Row - for i := range 5 { - row = c.database.QueryRow(ctx, query, args...) - err := row.Err() - if !isRetryableException(err) { - break - } - c.logger.Info("[queryRow] retryable error", slog.Any("error", row.Err()), slog.Any("query", query), slog.Int64("i", int64(i))) - if i < 4 { - time.Sleep(time.Second * time.Duration(i*5+1)) - } - } - return row + return chvalidate.QueryRow(ctx, c.logger, c.database, query, args...) } func (c *ClickHouseConnector) Close() error { @@ -367,78 +300,11 @@ func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error { func (c *ClickHouseConnector) execWithLogging(ctx context.Context, query string) error { c.logger.Info("[clickhouse] executing DDL statement", slog.String("query", query)) - return c.database.Exec(ctx, query) -} - -func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tables []string, optedForInitialLoad bool) error { - queryInput := make([]interface{}, 0, len(tables)+1) - queryInput = append(queryInput, c.config.Database) - for _, table := range tables { - queryInput = append(queryInput, table) - } - rows, err := c.query(ctx, - fmt.Sprintf("SELECT name,engine,total_rows FROM system.tables WHERE database=? AND name IN (%s)", - strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) - if err != nil { - return fmt.Errorf("failed to get information for destination tables: %w", err) - } - defer rows.Close() - - for rows.Next() { - var tableName, engine string - var totalRows uint64 - if err := rows.Scan(&tableName, &engine, &totalRows); err != nil { - return fmt.Errorf("failed to scan information for tables: %w", err) - } - if totalRows != 0 && optedForInitialLoad { - return fmt.Errorf("table %s exists and is not empty", tableName) - } - if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) { - c.logger.Warn("[clickhouse] table engine not explicitly supported", - slog.String("table", tableName), slog.String("engine", engine)) - } - if peerdbenv.PeerDBOnlyClickHouseAllowed() && !strings.HasPrefix(engine, "Shared") { - return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", tableName) - } - } - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to read rows: %w", err) - } - return nil -} - -func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, - tables []string, -) (map[string][]*protos.FieldDescription, error) { - tableColumnsMapping := make(map[string][]*protos.FieldDescription, len(tables)) - queryInput := make([]interface{}, 0, len(tables)+1) - queryInput = append(queryInput, c.config.Database) - for _, table := range tables { - queryInput = append(queryInput, table) - } - rows, err := c.query(ctx, - fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=? AND table IN (%s)", - strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) - if err != nil { - return nil, fmt.Errorf("failed to get columns for destination tables: %w", err) - } - defer rows.Close() - for rows.Next() { - var tableName string - var fieldDescription protos.FieldDescription - if err := rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName); err != nil { - return nil, fmt.Errorf("failed to scan columns for tables: %w", err) - } - tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription) - } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to read rows: %w", err) - } - return tableColumnsMapping, nil + return c.exec(ctx, query) } func (c *ClickHouseConnector) processTableComparison(dstTableName string, srcSchema *protos.TableSchema, - dstSchema []*protos.FieldDescription, peerDBColumns []string, tableMapping *protos.TableMapping, + dstSchema []chvalidate.ClickHouseColumn, peerDBColumns []string, tableMapping *protos.TableMapping, ) error { for _, srcField := range srcSchema.Columns { colName := srcField.Name @@ -481,15 +347,9 @@ func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *p tableNameSchemaMapping map[string]*protos.TableSchema, ) error { if peerdbenv.PeerDBOnlyClickHouseAllowed() { - // this is to indicate ClickHouse Cloud service is now creating tables with Shared* by default - var cloudModeEngine bool - if err := c.queryRow(ctx, - "SELECT value='2' AND changed='1' AND readonly='1' FROM system.settings WHERE name = 'cloud_mode_engine'"). - Scan(&cloudModeEngine); err != nil { - return fmt.Errorf("failed to validate cloud_mode_engine setting: %w", err) - } - if !cloudModeEngine { - return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support") + err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) + if err != nil { + return err } } @@ -504,13 +364,13 @@ func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *p // In the case of resync, we don't need to check the content or structure of the original tables; // they'll anyways get swapped out with the _resync tables which we CREATE OR REPLACE if !req.Resync { - err := c.checkTablesEmptyAndEngine(ctx, dstTableNames, req.DoInitialSnapshot) - if err != nil { + if err := chvalidate.CheckIfTablesEmptyAndEngine(ctx, c.logger, c.database, + dstTableNames, req.DoInitialSnapshot, peerdbenv.PeerDBOnlyClickHouseAllowed()); err != nil { return err } } // optimization: fetching columns for all tables at once - chTableColumnsMapping, err := c.getTableColumnsMapping(ctx, dstTableNames) + chTableColumnsMapping, err := chvalidate.GetTableColumnsMapping(ctx, c.logger, c.database, dstTableNames) if err != nil { return err } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index f7f6d8f980..36e510e405 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -28,8 +28,6 @@ const ( versionColType = "Int64" ) -var acceptableTableEngines = []string{"ReplacingMergeTree", "MergeTree"} - func (c *ClickHouseConnector) StartSetupNormalizedTables(_ context.Context) (interface{}, error) { return nil, nil } diff --git a/flow/shared/clickhouse/query_retry.go b/flow/shared/clickhouse/query_retry.go new file mode 100644 index 0000000000..8cc3f68aa2 --- /dev/null +++ b/flow/shared/clickhouse/query_retry.go @@ -0,0 +1,98 @@ +package clickhouse + +import ( + "context" + "errors" + "io" + "log/slog" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "go.temporal.io/sdk/log" +) + +// https://github.com/ClickHouse/clickhouse-kafka-connect/blob/2e0c17e2f900d29c00482b9d0a1f55cb678244e5/src/main/java/com/clickhouse/kafka/connect/util/Utils.java#L78-L93 +// +//nolint:lll +var retryableExceptions = map[int32]struct{}{ + 3: {}, // UNEXPECTED_END_OF_FILE + 107: {}, // FILE_DOESNT_EXIST + 159: {}, // TIMEOUT_EXCEEDED + 164: {}, // READONLY + 202: {}, // TOO_MANY_SIMULTANEOUS_QUERIES + 203: {}, // NO_FREE_CONNECTION + 209: {}, // SOCKET_TIMEOUT + 210: {}, // NETWORK_ERROR + 241: {}, // MEMORY_LIMIT_EXCEEDED + 242: {}, // TABLE_IS_READ_ONLY + 252: {}, // TOO_MANY_PARTS + 285: {}, // TOO_FEW_LIVE_REPLICAS + 319: {}, // UNKNOWN_STATUS_OF_INSERT + 425: {}, // SYSTEM_ERROR + 999: {}, // KEEPER_EXCEPTION +} + +func isRetryableException(err error) bool { + if ex, ok := err.(*clickhouse.Exception); ok { + if ex == nil { + return false + } + _, yes := retryableExceptions[ex.Code] + return yes + } + return errors.Is(err, io.EOF) +} + +func Exec(ctx context.Context, logger log.Logger, + conn clickhouse.Conn, query string, args ...any, +) error { + var err error + for i := range 5 { + err = conn.Exec(ctx, query, args...) + if !isRetryableException(err) { + break + } + logger.Info("[exec] retryable error", slog.Any("error", err), slog.String("query", query), slog.Int64("retry", int64(i))) + if i < 4 { + time.Sleep(time.Second * time.Duration(i*5+1)) + } + } + return err +} + +func Query(ctx context.Context, logger log.Logger, + conn clickhouse.Conn, query string, args ...any, +) (driver.Rows, error) { + var rows driver.Rows + var err error + for i := range 5 { + rows, err = conn.Query(ctx, query, args...) + if !isRetryableException(err) { + break + } + logger.Info("[query] retryable error", slog.Any("error", err), slog.String("query", query), slog.Int64("retry", int64(i))) + if i < 4 { + time.Sleep(time.Second * time.Duration(i*5+1)) + } + } + return rows, err +} + +func QueryRow(ctx context.Context, logger log.Logger, + conn clickhouse.Conn, query string, args ...any, +) driver.Row { + var row driver.Row + for i := range 5 { + row = conn.QueryRow(ctx, query, args...) + err := row.Err() + if !isRetryableException(err) { + break + } + logger.Info("[queryRow] retryable error", slog.Any("error", err), slog.String("query", query), slog.Int64("retry", int64(i))) + if i < 4 { + time.Sleep(time.Second * time.Duration(i*5+1)) + } + } + return row +} diff --git a/flow/shared/clickhouse/validation.go b/flow/shared/clickhouse/validation.go new file mode 100644 index 0000000000..4915b6fc2d --- /dev/null +++ b/flow/shared/clickhouse/validation.go @@ -0,0 +1,105 @@ +package clickhouse + +import ( + "context" + "errors" + "fmt" + "log/slog" + "slices" + "strings" + + "github.com/ClickHouse/clickhouse-go/v2" + "go.temporal.io/sdk/log" +) + +var acceptableTableEngines = []string{"ReplacingMergeTree", "MergeTree"} + +func CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx context.Context, logger log.Logger, + conn clickhouse.Conn, +) error { + // this is to indicate ClickHouse Cloud service is now creating tables with Shared* by default + var cloudModeEngine bool + if err := QueryRow(ctx, logger, conn, + "SELECT value='2' AND changed='1' AND readonly='1' FROM system.settings WHERE name = 'cloud_mode_engine'"). + Scan(&cloudModeEngine); err != nil { + return fmt.Errorf("failed to validate cloud_mode_engine setting: %w", err) + } + if !cloudModeEngine { + return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support") + } + return nil +} + +func CheckIfTablesEmptyAndEngine(ctx context.Context, logger log.Logger, conn clickhouse.Conn, + tables []string, initialSnapshotEnabled bool, checkForCloudSMT bool, +) error { + queryInput := make([]interface{}, 0, len(tables)) + for _, table := range tables { + queryInput = append(queryInput, table) + } + rows, err := Query(ctx, logger, conn, + fmt.Sprintf("SELECT name,engine,total_rows FROM system.tables WHERE database=currentDatabase() AND name IN (%s)", + strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) + if err != nil { + return fmt.Errorf("failed to get information for destination tables: %w", err) + } + defer rows.Close() + + for rows.Next() { + var tableName, engine string + var totalRows uint64 + if err := rows.Scan(&tableName, &engine, &totalRows); err != nil { + return fmt.Errorf("failed to scan information for tables: %w", err) + } + if totalRows != 0 && initialSnapshotEnabled { + return fmt.Errorf("table %s exists and is not empty", tableName) + } + if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) { + logger.Warn("[clickhouse] table engine not explicitly supported", + slog.String("table", tableName), slog.String("engine", engine)) + } + if checkForCloudSMT && !strings.HasPrefix(engine, "Shared") { + return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", tableName) + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to read rows: %w", err) + } + + return nil +} + +type ClickHouseColumn struct { + Name string + Type string +} + +func GetTableColumnsMapping(ctx context.Context, logger log.Logger, conn clickhouse.Conn, + tables []string, +) (map[string][]ClickHouseColumn, error) { + tableColumnsMapping := make(map[string][]ClickHouseColumn, len(tables)) + queryInput := make([]interface{}, 0, len(tables)) + for _, table := range tables { + queryInput = append(queryInput, table) + } + rows, err := Query(ctx, logger, conn, + fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=currentDatabase() AND table IN (%s)", + strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) + if err != nil { + return nil, fmt.Errorf("failed to get columns for destination tables: %w", err) + } + defer rows.Close() + for rows.Next() { + var tableName string + var clickhouseColumn ClickHouseColumn + if err := rows.Scan(&clickhouseColumn.Name, &clickhouseColumn.Type, &tableName); err != nil { + return nil, fmt.Errorf("failed to scan columns for tables: %w", err) + } + tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], clickhouseColumn) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } + + return tableColumnsMapping, nil +}