From 73a71958ab14df780e538738af3c3bc0e2886579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 11 Oct 2024 15:53:53 +0000 Subject: [PATCH 1/4] drafts on a plane --- flow/connectors/clickhouse/cdc.go | 38 ++- flow/connectors/clickhouse/clickhouse.go | 297 +++++++++---------- flow/connectors/clickhouse/normalize.go | 51 ++-- flow/connectors/clickhouse/qrep.go | 18 +- flow/connectors/clickhouse/qrep_avro_sync.go | 10 +- flow/e2e/clickhouse/clickhouse.go | 262 +++++++++++----- flow/e2e/clickhouse/peer_flow_ch_test.go | 5 +- flow/go.mod | 16 +- flow/go.sum | 46 +-- 9 files changed, 423 insertions(+), 320 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 5dc8a14628..53a1434606 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,14 +2,13 @@ package connclickhouse import ( "context" - "database/sql" "errors" "fmt" "log/slog" "strings" - "github.com/ClickHouse/clickhouse-go/v2" - _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -19,8 +18,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;` - dropTableIfExistsSQL = "DROP TABLE IF EXISTS `%s`;" + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` ) // getRawTableName returns the raw table name for the given table identifier. @@ -29,17 +28,26 @@ func (c *ClickHouseConnector) getRawTableName(flowJobName string) string { } func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) { - var result sql.NullInt32 - err := c.queryRow(ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result) - if err != nil { + // TODO escape + var existsC chproto.ColUInt8 + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf(checkIfTableExistsSQL, "'"+databaseName+"'", "'"+tableIdentifier+"'"), + Result: chproto.Results{ + {Name: "table_exists", Data: &existsC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + if block.Rows != 1 { + return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) + } + if block.Info.Overflows { + return errors.New("[clickhouse] checkIfTableExists: expected 1 row, got block with overflow") + } + return nil + }, + }); err != nil { return false, fmt.Errorf("error while reading result row: %w", err) } - - if !result.Valid { - return false, errors.New("[clickhouse] checkIfTableExists: result is not valid") - } - - return result.Int32 == 1, nil + return existsC[0] != 0, nil } func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { @@ -203,7 +211,7 @@ func (c *ClickHouseConnector) RenameTables( if err := c.execWithLogging(ctx, fmt.Sprintf(dropTableIfExistsSQL, renameRequest.CurrentName)); err != nil { return nil, fmt.Errorf("unable to drop exchanged table %s: %w", renameRequest.CurrentName, err) } - } else if ex, ok := err.(*clickhouse.Exception); !ok || ex.Code != 48 { + } else if ex, ok := err.(*ch.Exception); !ok || ex.Code != 48 { // code 48 == not implemented -> move on to the fallback code, in all other error codes / types // return, since we know/assume that the exchange would be the sensible action return nil, fmt.Errorf("unable to exchange tables %s and %s: %w", renameRequest.NewName, renameRequest.CurrentName, err) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index d994620427..8a7950fc3d 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -14,9 +14,8 @@ import ( "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - chproto "github.com/ClickHouse/clickhouse-go/v2/lib/proto" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/aws/aws-sdk-go-v2/aws" "go.temporal.io/sdk/log" @@ -30,7 +29,7 @@ import ( type ClickHouseConnector struct { *metadataStore.PostgresMetadata - database clickhouse.Conn + database *ch.Client logger log.Logger config *protos.ClickhouseConfig credsProvider *utils.ClickHouseS3Credentials @@ -173,6 +172,17 @@ func NewClickHouseConnector( if err != nil { return nil, err } + if credentials.AWS.SessionToken != "" { + // 24.3.1 is minimum version of ClickHouse that actually supports session token + // https://github.com/ClickHouse/ClickHouse/issues/61230 + chVersion := database.ServerInfo() + if chVersion.Major < 24 || (chVersion.Major == 24 && (chVersion.Minor < 3 || (chVersion.Minor == 3 && chVersion.Patch < 1))) { + return nil, fmt.Errorf( + "provide S3 Transient Stage details explicitly or upgrade to ClickHouse version >= 24.3.1, current version is %d.%d.%d. %s", + chVersion.Major, chVersion.Minor, chVersion.Patch, + "You can also contact PeerDB support for implicit S3 stage setup for older versions of ClickHouse.") + } + } connector := &ClickHouseConnector{ database: database, @@ -185,28 +195,10 @@ func NewClickHouseConnector( }, } - if credentials.AWS.SessionToken != "" { - // 24.3.1 is minimum version of ClickHouse that actually supports session token - // https://github.com/ClickHouse/ClickHouse/issues/61230 - clickHouseVersion, err := database.ServerVersion() - if err != nil { - return nil, fmt.Errorf("failed to get ClickHouse version: %w", err) - } - if !chproto.CheckMinVersion( - chproto.Version{Major: 24, Minor: 3, Patch: 1}, - clickHouseVersion.Version, - ) { - return nil, fmt.Errorf( - "provide S3 Transient Stage details explicitly or upgrade to ClickHouse version >= 24.3.1, current version is %s. %s", - clickHouseVersion, - "You can also contact PeerDB support for implicit S3 stage setup for older versions of ClickHouse.") - } - } - return connector, nil } -func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) { +func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (*ch.Client, error) { var tlsSetting *tls.Config if !config.DisableTls { tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13} @@ -229,32 +221,42 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency - settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} - if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { - return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) - } else if maxInsertThreads != 0 { - settings["max_insert_threads"] = maxInsertThreads - } + /* + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} + if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) + } else if maxInsertThreads != 0 { + settings["max_insert_threads"] = maxInsertThreads + } - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, - Auth: clickhouse.Auth{ - Database: config.Database, - Username: config.User, - Password: config.Password, - }, - TLS: tlsSetting, - Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "peerdb"}, + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, + Auth: clickhouse.Auth{ + Database: config.Database, + Username: config.User, + Password: config.Password, }, - }, - Settings: settings, + TLS: tlsSetting, + Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, + ClientInfo: clickhouse.ClientInfo{ + Products: []struct { + Name string + Version string + }{ + {Name: "peerdb"}, + }, + }, + Settings: settings, + */ + conn, err := ch.Dial(ctx, ch.Options{ + Address: fmt.Sprintf("%s:%d", config.Host, config.Port), + Database: config.Database, + User: config.User, + Password: config.Password, + TLS: tlsSetting, + Compression: ch.CompressionLZ4, + ClientName: "peerdb", DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, }) @@ -292,21 +294,20 @@ var retryableExceptions = map[int32]struct{}{ } func isRetryableException(err error) bool { - if ex, ok := err.(*clickhouse.Exception); ok { + if ex, ok := err.(*ch.Exception); ok { if ex == nil { return false } - _, yes := retryableExceptions[ex.Code] + _, yes := retryableExceptions[int32(ex.Code)] return yes } return errors.Is(err, io.EOF) } -//nolint:unparam -func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...any) error { +func (c *ClickHouseConnector) exec(ctx context.Context, query string) error { var err error for i := range 5 { - err = c.database.Exec(ctx, query, args...) + err = c.database.Do(ctx, ch.Query{Body: query}) if !isRetryableException(err) { break } @@ -318,11 +319,10 @@ func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...an return err } -func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...any) (driver.Rows, error) { - var rows driver.Rows +func (c *ClickHouseConnector) query(ctx context.Context, query ch.Query) error { var err error for i := range 5 { - rows, err = c.database.Query(ctx, query, args...) + err = c.database.Do(ctx, query) if !isRetryableException(err) { break } @@ -331,23 +331,7 @@ func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...a time.Sleep(time.Second * time.Duration(i*5+1)) } } - return rows, err -} - -func (c *ClickHouseConnector) queryRow(ctx context.Context, query string, args ...any) driver.Row { - var row driver.Row - for i := range 5 { - row = c.database.QueryRow(ctx, query, args...) - err := row.Err() - if !isRetryableException(err) { - break - } - c.logger.Info("[queryRow] retryable error", slog.Any("error", row.Err()), slog.Any("query", query), slog.Int64("i", int64(i))) - if i < 4 { - time.Sleep(time.Second * time.Duration(i*5+1)) - } - } - return row + return err } func (c *ClickHouseConnector) Close() error { @@ -367,73 +351,83 @@ func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error { func (c *ClickHouseConnector) execWithLogging(ctx context.Context, query string) error { c.logger.Info("[clickhouse] executing DDL statement", slog.String("query", query)) - return c.database.Exec(ctx, query) + return c.exec(ctx, query) } func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tables []string, optedForInitialLoad bool) error { - queryInput := make([]interface{}, 0, len(tables)+1) - queryInput = append(queryInput, c.config.Database) + escapedTables := make([]string, 0, len(tables)) for _, table := range tables { - queryInput = append(queryInput, table) - } - rows, err := c.query(ctx, - fmt.Sprintf("SELECT name,engine,total_rows FROM system.tables WHERE database=? AND name IN (%s)", - strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) - if err != nil { + // TODO proper + escapedTables = append(escapedTables, "'"+table+"'") + } + var nameC chproto.ColStr + var engineC chproto.ColStr + var totalRowsC chproto.ColUInt64 + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf( + "SELECT name,engine,total_rows FROM system.tables WHERE database='%s' AND name IN (%s)", + c.config.Database, strings.Join(escapedTables, ",")), + Result: chproto.Results{ + {Name: "name", Data: &nameC}, + {Name: "engine", Data: &engineC}, + {Name: "total_rows", Data: &totalRowsC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + for idx := range block.Rows { + name := nameC.Row(idx) + engine := engineC.Row(idx) + totalRows := totalRowsC[idx] + if totalRows != 0 && optedForInitialLoad { + return fmt.Errorf("table %s exists and is not empty", name) + } + if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) { + c.logger.Warn("[clickhouse] table engine not explicitly supported", + slog.String("table", name), slog.String("engine", engine)) + } + if peerdbenv.PeerDBOnlyClickHouseAllowed() && !strings.HasPrefix(engine, "Shared") { + return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", name) + } + } + return nil + }, + }); err != nil { return fmt.Errorf("failed to get information for destination tables: %w", err) } - defer rows.Close() - - for rows.Next() { - var tableName, engine string - var totalRows uint64 - if err := rows.Scan(&tableName, &engine, &totalRows); err != nil { - return fmt.Errorf("failed to scan information for tables: %w", err) - } - if totalRows != 0 && optedForInitialLoad { - return fmt.Errorf("table %s exists and is not empty", tableName) - } - if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) { - c.logger.Warn("[clickhouse] table engine not explicitly supported", - slog.String("table", tableName), slog.String("engine", engine)) - } - if peerdbenv.PeerDBOnlyClickHouseAllowed() && !strings.HasPrefix(engine, "Shared") { - return fmt.Errorf("table %s exists and does not use SharedMergeTree engine", tableName) - } - } - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to read rows: %w", err) - } return nil } -func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, - tables []string, -) (map[string][]*protos.FieldDescription, error) { - tableColumnsMapping := make(map[string][]*protos.FieldDescription, len(tables)) - queryInput := make([]interface{}, 0, len(tables)+1) - queryInput = append(queryInput, c.config.Database) +func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, tables []string) (map[string][]*protos.FieldDescription, error) { + escapedTables := make([]string, 0, len(tables)) for _, table := range tables { - queryInput = append(queryInput, table) - } - rows, err := c.query(ctx, - fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=? AND table IN (%s)", - strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) - if err != nil { + // TODO proper + escapedTables = append(escapedTables, "'"+table+"'") + } + tableColumnsMapping := make(map[string][]*protos.FieldDescription) + var nameC chproto.ColStr + var typeC chproto.ColStr + var tableC chproto.ColStr + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=%s AND table IN (%s)", + c.config.Database, strings.Join(escapedTables, ",")), + Result: chproto.Results{ + {Name: "name", Data: &nameC}, + {Name: "type", Data: &typeC}, + {Name: "table", Data: &tableC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + for idx := range block.Rows { + table := tableC.Row(idx) + tableColumnsMapping[table] = append(tableColumnsMapping[table], &protos.FieldDescription{ + Name: nameC.Row(idx), + Type: typeC.Row(idx), + }) + } + return nil + }, + }, + ); err != nil { return nil, fmt.Errorf("failed to get columns for destination tables: %w", err) } - defer rows.Close() - for rows.Next() { - var tableName string - var fieldDescription protos.FieldDescription - if err := rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName); err != nil { - return nil, fmt.Errorf("failed to scan columns for tables: %w", err) - } - tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription) - } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to read rows: %w", err) - } return tableColumnsMapping, nil } @@ -482,15 +476,23 @@ func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *p ) error { if peerdbenv.PeerDBOnlyClickHouseAllowed() { // this is to indicate ClickHouse Cloud service is now creating tables with Shared* by default - var cloudModeEngine bool - if err := c.queryRow(ctx, - "SELECT value='2' AND changed='1' AND readonly='1' FROM system.settings WHERE name = 'cloud_mode_engine'"). - Scan(&cloudModeEngine); err != nil { + var cloudModeEngineC chproto.ColBool + if err := c.query(ctx, ch.Query{ + Body: "SELECT (value='2' AND changed='1' AND readonly='1') as engine FROM system.settings WHERE name = 'cloud_mode_engine'", + Result: chproto.Results{ + {Name: "engine", Data: &cloudModeEngineC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + for _, cloudModeEngine := range cloudModeEngineC { + if !cloudModeEngine { + return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support") + } + } + return nil + }, + }); err != nil { return fmt.Errorf("failed to validate cloud_mode_engine setting: %w", err) } - if !cloudModeEngine { - return errors.New("ClickHouse service is not migrated to use SharedMergeTree tables, please contact support") - } } peerDBColumns := []string{signColName, versionColName} @@ -536,19 +538,17 @@ func (c *ClickHouseConnector) CheckDestinationTables(ctx context.Context, req *p } func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error) { - clickhouseVersion, err := c.database.ServerVersion() - if err != nil { - return "", fmt.Errorf("failed to get ClickHouse version: %w", err) - } - c.logger.Info("[clickhouse] version", slog.Any("version", clickhouseVersion.DisplayName)) - return clickhouseVersion.Version.String(), nil + chVersion := c.database.ServerInfo() + chVersionStr := fmt.Sprintf("%d.%d.%d", chVersion.Major, chVersion.Minor, chVersion.Patch) + c.logger.Info("[clickhouse] version", slog.Any("version", chVersionStr)) + return chVersionStr, nil } -func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*protos.TableSchema, error) { +func GetTableSchemaForTable(tableName string, columns chproto.Results) (*protos.TableSchema, error) { colFields := make([]*protos.FieldDescription, 0, len(columns)) for _, column := range columns { var qkind qvalue.QValueKind - switch column.DatabaseTypeName() { + switch column.Data.Type() { case "String", "Nullable(String)": qkind = qvalue.QValueKindString case "Bool", "Nullable(Bool)": @@ -566,18 +566,18 @@ func GetTableSchemaForTable(tableName string, columns []driver.ColumnType) (*pro case "Date32", "Nullable(Date32)": qkind = qvalue.QValueKindDate default: - if strings.Contains(column.DatabaseTypeName(), "Decimal") { + if strings.Contains(string(column.Data.Type()), "Decimal") { qkind = qvalue.QValueKindNumeric } else { - return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.DatabaseTypeName()) + return nil, fmt.Errorf("failed to resolve QValueKind for %s", column.Data.Type()) } } colFields = append(colFields, &protos.FieldDescription{ - Name: column.Name(), + Name: column.Name, Type: string(qkind), TypeModifier: -1, - Nullable: column.Nullable(), + Nullable: column.Data.Type().Base() == "Nullable", }) } @@ -596,13 +596,12 @@ func (c *ClickHouseConnector) GetTableSchema( ) (map[string]*protos.TableSchema, error) { res := make(map[string]*protos.TableSchema, len(tableIdentifiers)) for _, tableName := range tableIdentifiers { - rows, err := c.database.Query(ctx, fmt.Sprintf("select * from %s limit 0", tableName)) + schema, err := c.getTableSchema(ctx, tableName) if err != nil { return nil, err } - tableSchema, err := GetTableSchemaForTable(tableName, rows.ColumnTypes()) - rows.Close() + tableSchema, err := GetTableSchemaForTable(tableName, schema) if err != nil { return nil, err } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fabe07a35f..2414ee1f8f 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -3,8 +3,6 @@ package connclickhouse import ( "cmp" "context" - "database/sql" - "errors" "fmt" "log/slog" "slices" @@ -12,7 +10,8 @@ import ( "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "golang.org/x/sync/errgroup" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -285,7 +284,7 @@ func (c *ClickHouseConnector) NormalizeRecords( group, errCtx := errgroup.WithContext(ctx) for i := range parallelNormalize { group.Go(func() error { - var chConn clickhouse.Conn + var chConn *ch.Client if i == 0 { chConn = c.database } else { @@ -299,7 +298,7 @@ func (c *ClickHouseConnector) NormalizeRecords( for query := range queries { c.logger.Info("normalizing batch", slog.String("query", query)) - if err := chConn.Exec(errCtx, query); err != nil { + if err := chConn.Do(errCtx, ch.Query{Body: query}); err != nil { return fmt.Errorf("error while inserting into normalized table: %w", err) } } @@ -487,33 +486,25 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( ) ([]string, error) { rawTbl := c.getRawTableName(flowJobName) - q := fmt.Sprintf( - `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d`, - rawTbl, normalizeBatchID, syncBatchID) - - rows, err := c.query(ctx, q) - if err != nil { - return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) - } - defer rows.Close() var tableNames []string - for rows.Next() { - var tableName sql.NullString - if err := rows.Scan(&tableName); err != nil { - return nil, fmt.Errorf("error while scanning table name: %w", err) - } - - if !tableName.Valid { - return nil, errors.New("table name is not valid") - } - - tableNames = append(tableNames, tableName.String) - } - - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to read rows: %w", err) + var tableNameC chproto.ColStr + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf( + `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id>%d AND _peerdb_batch_id<=%d`, + rawTbl, normalizeBatchID, syncBatchID), + Result: chproto.Results{ + {Name: "_peerdb_destination_table_name", Data: &tableNameC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + tableNames := slices.Grow(tableNames, block.Rows) + return tableNameC.ForEach(func(i int, s string) error { + tableNames = append(tableNames, s) + return nil + }) + }, + }); err != nil { + return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) } - return tableNames, nil } diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 376d05e7ae..5de6b05d2f 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -6,7 +6,8 @@ import ( "log/slog" "strings" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -44,15 +45,18 @@ func (c *ClickHouseConnector) SyncQRepRecords( return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream) } -func (c *ClickHouseConnector) getTableSchema(ctx context.Context, tableName string) ([]driver.ColumnType, error) { - queryString := fmt.Sprintf("SELECT * FROM `%s` LIMIT 0", tableName) - rows, err := c.query(ctx, queryString) - if err != nil { +func (c *ClickHouseConnector) getTableSchema(ctx context.Context, tableName string) (chproto.Results, error) { + // TODO escape + queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName) + var schema chproto.Results + if err := c.query(ctx, ch.Query{ + Body: queryString, + Result: schema.Auto(), + }); err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } - defer rows.Close() - return rows.ColumnTypes(), nil + return schema, nil } func (c *ClickHouseConnector) ConsolidateQRepPartitions(_ context.Context, config *protos.QRepConfig) error { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 61450dd55c..e3c570f04e 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" @@ -55,7 +55,7 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a s.config.DestinationTableIdentifier, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - return s.database.Exec(ctx, query) + return s.exec(ctx, query) } func (s *ClickHouseAvroSyncMethod) SyncRecords( @@ -99,7 +99,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, - dstTableSchema []driver.ColumnType, + dstTableSchema chproto.Results, stream *model.QRecordStream, ) (int, error) { dstTableName := config.DestinationTableIdentifier @@ -131,7 +131,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( avroFileUrl := utils.FileURLForS3Service(endpoint, region, s3o.Bucket, avroFile.FilePath) selector := make([]string, 0, len(dstTableSchema)) for _, col := range dstTableSchema { - colName := col.Name() + colName := col.Name if strings.EqualFold(colName, config.SoftDeleteColName) || strings.EqualFold(colName, signColName) || strings.EqualFold(colName, config.SyncedAtColName) || @@ -151,7 +151,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - if err := s.database.Exec(ctx, query); err != nil { + if err := s.exec(ctx, query); err != nil { s.logger.Error("Failed to insert into select for ClickHouse", slog.Any("error", err)) return 0, err } diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index a36a5335b9..d736957ed2 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -2,12 +2,17 @@ package e2e_clickhouse import ( "context" + "encoding/binary" + "errors" "fmt" - "reflect" + "math/big" + "strconv" "strings" "testing" "time" + chgo "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/jackc/pgx/v5" "github.com/shopspring/decimal" "github.com/stretchr/testify/require" @@ -91,89 +96,206 @@ func (s ClickHouseSuite) Teardown() { e2e.TearDownPostgres(s) } -func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) { - ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) - if err != nil { - return nil, err +// from clickhouse-go lib/column/bigint.go +func endianSwap(src []byte, not bool) { + for i := range len(src) / 2 { + if not { + src[i], src[len(src)-i-1] = ^src[len(src)-i-1], ^src[i] + } else { + src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i] + } } - defer ch.Close() +} - rows, err := ch.Query( - context.Background(), - fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table), - ) +// from clickhouse-go lib/column/bigint.go +func rawToBigInt(v []byte, signed bool) *big.Int { + // LittleEndian to BigEndian + endianSwap(v, false) + lt := new(big.Int) + if signed && len(v) > 0 && v[0]&0x80 != 0 { + // [0] ^ will +1 + for i := 0; i < len(v); i++ { + v[i] = ^v[i] + } + lt.SetBytes(v) + // neg ^ will -1 + lt.Not(lt) + } else { + lt.SetBytes(v) + } + return lt +} + +// largely taken from clickhouse-go lib/column/decimal.go +func decimalRow(col chproto.ColResult, i int) decimal.Decimal { + typ := string(col.Type()) + lparam := strings.LastIndex(typ, "(") + if lparam == -1 { + panic("no ( in " + typ) + } + params := typ[lparam+1:] + rparam := strings.Index(params, ")") + if rparam == -1 { + panic("no ) in params " + params + " of " + typ) + } + params = typ[:rparam] + _, scaleStr, ok := strings.Cut(params, ",") + if !ok { + panic("no , in params " + params + " of " + typ) + } + scaleStr = strings.TrimSpace(scaleStr) + scale, err := strconv.Atoi(scaleStr) if err != nil { - return nil, err + panic("failed to parse scale " + scaleStr + ": " + err.Error()) } - defer rows.Close() - batch := &model.QRecordBatch{} - types := rows.ColumnTypes() - row := make([]interface{}, 0, len(types)) - tableSchema, err := connclickhouse.GetTableSchemaForTable(table, types) + var value decimal.Decimal + switch vCol := col.(type) { + case *chproto.ColDecimal32: + v := vCol.Row(i) + value = decimal.New(int64(v), int32(-scale)) + case *chproto.ColDecimal64: + v := vCol.Row(i) + value = decimal.New(int64(v), int32(-scale)) + case *chproto.ColDecimal128: + v := vCol.Row(i) + b := make([]byte, 16) + binary.LittleEndian.PutUint64(b[0:64/8], v.Low) + binary.LittleEndian.PutUint64(b[64/8:128/8], v.High) + bv := rawToBigInt(b, true) + value = decimal.NewFromBigInt(bv, int32(-scale)) + case *chproto.ColDecimal256: + v := vCol.Row(i) + b := make([]byte, 32) + binary.LittleEndian.PutUint64(b[0:64/8], v.Low.Low) + binary.LittleEndian.PutUint64(b[64/8:128/8], v.Low.High) + binary.LittleEndian.PutUint64(b[128/8:192/8], v.High.Low) + binary.LittleEndian.PutUint64(b[192/8:256/8], v.High.High) + bv := rawToBigInt(b, true) + value = decimal.NewFromBigInt(bv, int32(-scale)) + } + return value +} + +func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) { + ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) if err != nil { return nil, err } + defer ch.Close() - for idx, ty := range types { - fieldDesc := tableSchema.Columns[idx] - row = append(row, reflect.New(ty.ScanType()).Interface()) - batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ - Name: ty.Name(), - Type: qvalue.QValueKind(fieldDesc.Type), - Precision: 0, - Scale: 0, - Nullable: fieldDesc.Nullable, - }) + firstCol, _, _ := strings.Cut(cols, ",") + if firstCol == "" { + return nil, errors.New("no columns specified") } - - for rows.Next() { - if err := rows.Scan(row...); err != nil { - return nil, err - } - qrow := make([]qvalue.QValue, 0, len(row)) - for _, val := range row { - switch v := val.(type) { - case **string: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindString)) - } else { - qrow = append(qrow, qvalue.QValueString{Val: **v}) - } - case *string: - qrow = append(qrow, qvalue.QValueString{Val: *v}) - case **int32: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) - } else { - qrow = append(qrow, qvalue.QValueInt32{Val: **v}) - } - case *int32: - qrow = append(qrow, qvalue.QValueInt32{Val: *v}) - case **time.Time: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp)) - } else { - qrow = append(qrow, qvalue.QValueTimestamp{Val: **v}) + batch := &model.QRecordBatch{} + var schema chproto.Results + if err := ch.Do(context.Background(), chgo.Query{ + Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, + cols, table, firstCol), + Result: schema.Auto(), + OnResult: func(ctx context.Context, block chproto.Block) error { + if len(batch.Schema.Fields) == 0 { + for _, col := range schema { + nullable := strings.HasPrefix(string(col.Data.Type()), "Nullable(") + var qkind qvalue.QValueKind + switch col.Data.Type() { + case "String", "Nullable(String)": + qkind = qvalue.QValueKindString + case "Int32", "Nullable(Int32)": + qkind = qvalue.QValueKindInt32 + case "DateTime64(6)", "Nullable(DateTime64(6))": + qkind = qvalue.QValueKindTimestamp + case "Date32", "Nullable(Date32)": + qkind = qvalue.QValueKindDate + default: + if strings.Contains(string(col.Data.Type()), "Decimal") { + qkind = qvalue.QValueKindNumeric + } else { + return fmt.Errorf("failed to resolve QValueKind for %s", col.Data.Type()) + } + } + batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ + Name: col.Name, + Type: qkind, + Precision: 0, + Scale: 0, + Nullable: nullable, + }) } - case *time.Time: - qrow = append(qrow, qvalue.QValueTimestamp{Val: *v}) - case **decimal.Decimal: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) - } else { - qrow = append(qrow, qvalue.QValueNumeric{Val: **v}) + } + + for idx := range block.Rows { + qrow := make([]qvalue.QValue, 0, block.Columns) + for _, col := range schema { + switch v := col.Data.(type) { + case *chproto.ColNullable[string]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindString)) + } else { + qrow = append(qrow, qvalue.QValueString{Val: v.Values.Row(idx)}) + } + case *chproto.ColStr: + qrow = append(qrow, qvalue.QValueString{Val: v.Row(idx)}) + case *chproto.ColNullable[int32]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) + } else { + qrow = append(qrow, qvalue.QValueInt32{Val: v.Values.Row(idx)}) + } + case *chproto.ColInt32: + qrow = append(qrow, qvalue.QValueInt32{Val: v.Row(idx)}) + case *chproto.ColNullable[time.Time]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp)) + } else { + qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Values.Row(idx)}) + } + case *chproto.ColDateTime64: + qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Row(idx)}) + case *chproto.ColNullable[chproto.Decimal32]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColNullable[chproto.Decimal64]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColNullable[chproto.Decimal128]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColNullable[chproto.Decimal256]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColDecimal32, + *chproto.ColDecimal64, + *chproto.ColDecimal128, + *chproto.ColDecimal256: + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v, idx)}) + default: + return fmt.Errorf("cannot convert %T to qvalue", v) + } } - case *decimal.Decimal: - qrow = append(qrow, qvalue.QValueNumeric{Val: *v}) - default: - return nil, fmt.Errorf("cannot convert %T to qvalue", v) + batch.Records = append(batch.Records, qrow) } - } - batch.Records = append(batch.Records, qrow) + + return nil + }, + }); err != nil { + return nil, err } - return batch, rows.Err() + return batch, nil } func SetupSuite(t *testing.T) ClickHouseSuite { @@ -195,7 +317,7 @@ func SetupSuite(t *testing.T) ClickHouseSuite { ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig()) require.NoError(t, err, "failed to connect to clickhouse") - err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix) + err = ch.Do(context.Background(), chgo.Query{Body: "CREATE DATABASE e2e_test_" + suffix}) require.NoError(t, err, "failed to create clickhouse database") return s diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index a19e69c8c7..cae237cc55 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + chgo "github.com/ClickHouse/ch-go" "github.com/shopspring/decimal" "github.com/stretchr/testify/require" @@ -508,7 +509,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) { // now test weird names with rename based resync ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) require.NoError(s.t, err) - require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("DROP TABLE `%s`", dstTableName))) + require.NoError(s.t, ch.Do(context.Background(), chgo.Query{Body: fmt.Sprintf("DROP TABLE `%s`", dstTableName)})) require.NoError(s.t, ch.Close()) flowConnConfig.Resync = true env = e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) @@ -526,7 +527,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) { // now test weird names with exchange based resync ch, err = connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) require.NoError(s.t, err) - require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE `%s`", dstTableName))) + require.NoError(s.t, ch.Do(context.Background(), chgo.Query{Body: fmt.Sprintf("TRUNCATE TABLE `%s`", dstTableName)})) require.NoError(s.t, ch.Close()) env = e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) diff --git a/flow/go.mod b/flow/go.mod index a11ffb5a7e..b3685f49d2 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -10,7 +10,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.3 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.3.0 - github.com/ClickHouse/clickhouse-go/v2 v2.30.0 + github.com/ClickHouse/ch-go v0.63.1 github.com/PeerDB-io/glua64 v1.0.1 github.com/PeerDB-io/gluabit32 v1.0.2 github.com/PeerDB-io/gluaflatbuffers v1.0.1 @@ -56,13 +56,9 @@ require ( go.opentelemetry.io/otel v1.32.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 go.opentelemetry.io/otel/metric v1.32.0 go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 - go.opentelemetry.io/otel/trace v1.32.0 go.temporal.io/api v1.41.0 go.temporal.io/sdk v1.30.0 go.temporal.io/sdk/contrib/opentelemetry v0.6.0 @@ -85,7 +81,6 @@ require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect github.com/BurntSushi/toml v1.4.0 // indirect - github.com/ClickHouse/ch-go v0.63.1 // indirect github.com/DataDog/zstd v1.5.6 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.49.0 // indirect @@ -109,6 +104,7 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/danieljoos/wincred v1.2.2 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/dmarkham/enumer v1.5.10 // indirect github.com/dvsekhvalnov/jose2go v1.7.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -132,6 +128,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -147,7 +144,7 @@ require ( github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nexus-rpc/sdk-go v0.0.11 // indirect - github.com/paulmach/orb v0.11.1 // indirect + github.com/pascaldekloe/name v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/prometheus/client_golang v1.20.5 // indirect @@ -162,7 +159,11 @@ require ( go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/term v0.25.0 // indirect google.golang.org/grpc/stats/opentelemetry v0.0.0-20241028142157-ada6787961b3 // indirect @@ -185,7 +186,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 // indirect github.com/Azure/go-amqp v1.2.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.3.0 // indirect - github.com/andybalholm/brotli v1.1.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.22 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.22 // indirect diff --git a/flow/go.sum b/flow/go.sum index 7a0380da03..4643ee0ad7 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -70,8 +70,6 @@ github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0 github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/ClickHouse/ch-go v0.63.1 h1:s2JyZvWLTCSAGdtjMBBmAgQQHMco6pawLJMOXi0FODM= github.com/ClickHouse/ch-go v0.63.1/go.mod h1:I1kJJCL3WJcBMGe1m+HVK0+nREaG+JOYYBWjrDrF3R0= -github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h2lrmGGk17dhFo= -github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo= github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY= github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 h1:3c8yed4lgqTt+oTQ+JNMDo+F4xprBf+O/il4ZC0nRLw= @@ -100,8 +98,6 @@ github.com/alecthomas/assert/v2 v2.11.0 h1:2Q9r3ki8+JYXvGsDyBXwH3LcJ+WK5D0gc5E8v github.com/alecthomas/assert/v2 v2.11.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= -github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= -github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= github.com/aws/aws-sdk-go-v2 v1.32.3 h1:T0dRlFBKcdaUPGNtkBSwHZxrtis8CQU17UpNBZYd0wk= @@ -192,6 +188,8 @@ github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE= github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4= github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg= +github.com/dmarkham/enumer v1.5.10 h1:ygL0L6quiTiH1jpp68DyvsWaea6MaZLZrTTkIS++R0M= +github.com/dmarkham/enumer v1.5.10/go.mod h1:e4VILe2b1nYK3JKJpRmNdl5xbDQvELc6tQ8b+GsGk6E= github.com/dvsekhvalnov/jose2go v1.7.0 h1:bnQc8+GMnidJZA8zc6lLEAb4xNrIqHwO+9TzqvtQZPo= github.com/dvsekhvalnov/jose2go v1.7.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= @@ -276,7 +274,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -291,9 +288,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -328,6 +323,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= @@ -360,7 +357,6 @@ github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6 h1:IsMZxCuZqKu github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeWNIJaW+O5xpRQbPp0Ybqu1vJd/pm7s2F473HRrkw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= @@ -403,7 +399,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -418,9 +413,8 @@ github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= -github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= -github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= -github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= +github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0= +github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -486,7 +480,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= github.com/twmb/franz-go/pkg/kadm v1.14.0 h1:nAn1co1lXzJQocpzyIyOFOjUBf4WHWs5/fTprXy2IZs= @@ -501,12 +494,6 @@ github.com/urfave/cli/v3 v3.0.0-alpha9.2 h1:CL8llQj3dGRLVQQzHxS+ZYRLanOuhyK1fXgL github.com/urfave/cli/v3 v3.0.0-alpha9.2/go.mod h1:FnIeEMYu+ko8zP1F9Ypr3xkZMIDqW3DR92yUtY39q1Y= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= -github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= -github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -520,7 +507,6 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw= go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg= -go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/detectors/gcp v1.31.0 h1:G1JQOreVrfhRkner+l4mrGxmfqYCAuy76asTDAo0xsA= @@ -535,12 +521,6 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 h1:9kV11HXBHZAvuPUZxmMWrH8hZn/6UnHX4K0mu36vNsU= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0/go.mod h1:JyA0FHXe22E1NeNiHmVp7kFHglnexDQ7uRWDiiJ1hKQ= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= @@ -558,17 +538,22 @@ go.temporal.io/sdk v1.30.0/go.mod h1:Pv45F/fVDgWKx+jhix5t/dGgqROVaI+VjPLd3CHWqq0 go.temporal.io/sdk/contrib/opentelemetry v0.6.0 h1:rNBArDj5iTUkcMwKocUShoAW59o6HdS7Nq4CTp4ldj8= go.temporal.io/sdk/contrib/opentelemetry v0.6.0/go.mod h1:Lem8VrE2ks8P+FYcRM3UphPoBr+tfM3v/Kaf0qStzSg= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -593,7 +578,6 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -614,9 +598,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -628,8 +610,6 @@ golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= @@ -687,8 +667,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From a28bce46fba0d3755685bd31304a59759751007b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 12 Oct 2024 14:59:53 +0000 Subject: [PATCH 2/4] OnResult sending empty blocks --- flow/connectors/clickhouse/cdc.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 53a1434606..9f7defa612 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,7 +2,6 @@ package connclickhouse import ( "context" - "errors" "fmt" "log/slog" "strings" @@ -18,8 +17,8 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` - dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists` + dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s` ) // getRawTableName returns the raw table name for the given table identifier. @@ -36,16 +35,13 @@ func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseNa {Name: "table_exists", Data: &existsC}, }, OnResult: func(ctx context.Context, block chproto.Block) error { - if block.Rows != 1 { - return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) - } - if block.Info.Overflows { - return errors.New("[clickhouse] checkIfTableExists: expected 1 row, got block with overflow") - } return nil }, }); err != nil { - return false, fmt.Errorf("error while reading result row: %w", err) + return false, fmt.Errorf("[clickhouse] checkIfTableExists: error in query: %w", err) + } + if len(existsC) != 1 { + return false, fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", len(existsC)) } return existsC[0] != 0, nil } From 97b5d0484037eae2fc15dd7986853a71a5a918d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 12 Oct 2024 15:37:28 +0000 Subject: [PATCH 3/4] Date32 --- flow/e2e/clickhouse/clickhouse.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index d736957ed2..d342227944 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -251,6 +251,8 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } else { qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Values.Row(idx)}) } + case *chproto.ColDate32: + qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Row(idx)}) case *chproto.ColDateTime64: qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Row(idx)}) case *chproto.ColNullable[chproto.Decimal32]: From bfc18cebf6ae5318aea352f674f98e44d98a4730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 10 Dec 2024 17:53:57 +0000 Subject: [PATCH 4/4] post rebase fixes --- flow/connectors/clickhouse/clickhouse.go | 36 ++------ flow/e2e/clickhouse/clickhouse.go | 101 ++++++++++------------- 2 files changed, 53 insertions(+), 84 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 8a7950fc3d..76f12c6673 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -11,6 +11,7 @@ import ( "maps" "net/url" "slices" + "strconv" "strings" "time" @@ -221,34 +222,14 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - /* - // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency - settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} - if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { - return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) - } else if maxInsertThreads != 0 { - settings["max_insert_threads"] = maxInsertThreads - } + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := []ch.Setting{{Key: "select_sequential_consistency", Value: "1"}} + if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) + } else if maxInsertThreads != 0 { + settings = append(settings, ch.Setting{Key: "max_insert_threads", Value: strconv.FormatInt(maxInsertThreads, 10)}) + } - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, - Auth: clickhouse.Auth{ - Database: config.Database, - Username: config.User, - Password: config.Password, - }, - TLS: tlsSetting, - Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "peerdb"}, - }, - }, - Settings: settings, - */ conn, err := ch.Dial(ctx, ch.Options{ Address: fmt.Sprintf("%s:%d", config.Host, config.Port), Database: config.Database, @@ -259,6 +240,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou ClientName: "peerdb", DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, + Settings: settings, }) if err != nil { return nil, fmt.Errorf("failed to connect to ClickHouse peer: %w", err) diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index d342227944..c623ddd005 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -3,9 +3,9 @@ package e2e_clickhouse import ( "context" "encoding/binary" - "errors" "fmt" "math/big" + "slices" "strconv" "strings" "testing" @@ -96,21 +96,10 @@ func (s ClickHouseSuite) Teardown() { e2e.TearDownPostgres(s) } -// from clickhouse-go lib/column/bigint.go -func endianSwap(src []byte, not bool) { - for i := range len(src) / 2 { - if not { - src[i], src[len(src)-i-1] = ^src[len(src)-i-1], ^src[i] - } else { - src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i] - } - } -} - // from clickhouse-go lib/column/bigint.go func rawToBigInt(v []byte, signed bool) *big.Int { // LittleEndian to BigEndian - endianSwap(v, false) + slices.Reverse(v) lt := new(big.Int) if signed && len(v) > 0 && v[0]&0x80 != 0 { // [0] ^ will +1 @@ -130,23 +119,24 @@ func rawToBigInt(v []byte, signed bool) *big.Int { func decimalRow(col chproto.ColResult, i int) decimal.Decimal { typ := string(col.Type()) lparam := strings.LastIndex(typ, "(") - if lparam == -1 { - panic("no ( in " + typ) - } - params := typ[lparam+1:] - rparam := strings.Index(params, ")") - if rparam == -1 { - panic("no ) in params " + params + " of " + typ) - } - params = typ[:rparam] - _, scaleStr, ok := strings.Cut(params, ",") - if !ok { - panic("no , in params " + params + " of " + typ) - } - scaleStr = strings.TrimSpace(scaleStr) - scale, err := strconv.Atoi(scaleStr) - if err != nil { - panic("failed to parse scale " + scaleStr + ": " + err.Error()) + scale := 0 + if lparam != -1 { + params := typ[lparam+1:] + rparam := strings.Index(params, ")") + if rparam == -1 { + panic("no ) in params " + params + " of " + typ) + } + params = typ[:rparam] + _, scaleStr, ok := strings.Cut(params, ",") + if !ok { + panic("no , in params " + params + " of " + typ) + } + scaleStr = strings.TrimSpace(scaleStr) + var err error + scale, err = strconv.Atoi(scaleStr) + if err != nil { + panic("failed to parse scale " + scaleStr + ": " + err.Error()) + } } var value decimal.Decimal @@ -184,43 +174,24 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } defer ch.Close() - firstCol, _, _ := strings.Cut(cols, ",") - if firstCol == "" { - return nil, errors.New("no columns specified") - } batch := &model.QRecordBatch{} var schema chproto.Results if err := ch.Do(context.Background(), chgo.Query{ - Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, - cols, table, firstCol), + Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table), Result: schema.Auto(), OnResult: func(ctx context.Context, block chproto.Block) error { if len(batch.Schema.Fields) == 0 { - for _, col := range schema { - nullable := strings.HasPrefix(string(col.Data.Type()), "Nullable(") - var qkind qvalue.QValueKind - switch col.Data.Type() { - case "String", "Nullable(String)": - qkind = qvalue.QValueKindString - case "Int32", "Nullable(Int32)": - qkind = qvalue.QValueKindInt32 - case "DateTime64(6)", "Nullable(DateTime64(6))": - qkind = qvalue.QValueKindTimestamp - case "Date32", "Nullable(Date32)": - qkind = qvalue.QValueKindDate - default: - if strings.Contains(string(col.Data.Type()), "Decimal") { - qkind = qvalue.QValueKindNumeric - } else { - return fmt.Errorf("failed to resolve QValueKind for %s", col.Data.Type()) - } - } + tableSchema, err := connclickhouse.GetTableSchemaForTable(table, schema) + if err != nil { + return err + } + for _, col := range tableSchema.Columns { batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ Name: col.Name, - Type: qkind, + Type: qvalue.QValueKind(col.Type), Precision: 0, Scale: 0, - Nullable: nullable, + Nullable: col.Nullable, }) } } @@ -237,6 +208,14 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *chproto.ColStr: qrow = append(qrow, qvalue.QValueString{Val: v.Row(idx)}) + case *chproto.ColNullable[int16]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) + } else { + qrow = append(qrow, qvalue.QValueInt16{Val: v.Values.Row(idx)}) + } + case *chproto.ColInt16: + qrow = append(qrow, qvalue.QValueInt16{Val: v.Row(idx)}) case *chproto.ColNullable[int32]: if v.Nulls[idx] != 0 { qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) @@ -245,6 +224,14 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch } case *chproto.ColInt32: qrow = append(qrow, qvalue.QValueInt32{Val: v.Row(idx)}) + case *chproto.ColNullable[int64]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) + } else { + qrow = append(qrow, qvalue.QValueInt64{Val: v.Values.Row(idx)}) + } + case *chproto.ColInt64: + qrow = append(qrow, qvalue.QValueInt64{Val: v.Row(idx)}) case *chproto.ColNullable[time.Time]: if v.Nulls[idx] != 0 { qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp))