diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 1c0b651ad0..f09dcff9aa 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,14 +2,12 @@ package connclickhouse import ( "context" - "database/sql" - "errors" "fmt" "log/slog" "strings" - _ "github.com/ClickHouse/clickhouse-go/v2" - _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -19,7 +17,7 @@ import ( ) const ( - checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;` + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = %s AND name = %s) AS table_exists;` dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;` ) @@ -29,17 +27,23 @@ func (c *ClickHouseConnector) getRawTableName(flowJobName string) string { } func (c *ClickHouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) { - var result sql.NullInt32 - err := c.queryRow(ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result) - if err != nil { + // TODO escape + var existsC chproto.ColBool + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf(checkIfTableExistsSQL, "'"+databaseName+"'", "'"+tableIdentifier+"'"), + Result: chproto.Results{ + {Name: "table_exists", Data: &existsC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + if block.Rows != 1 { + return fmt.Errorf("[clickhouse] checkIfTableExists: expected 1 row, got %d", block.Rows) + } + return nil + }, + }); err != nil { return false, fmt.Errorf("error while reading result row: %w", err) } - - if !result.Valid { - return false, errors.New("[clickhouse] checkIfTableExists: result is not valid") - } - - return result.Int32 == 1, nil + return existsC[0], nil } func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index f76ebb480e..f167afe42c 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -14,8 +14,8 @@ import ( "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/aws/aws-sdk-go-v2/aws" "go.temporal.io/sdk/log" "golang.org/x/mod/semver" @@ -30,7 +30,7 @@ import ( type ClickHouseConnector struct { *metadataStore.PostgresMetadata - database clickhouse.Conn + database *ch.Client logger log.Logger config *protos.ClickhouseConfig credsProvider *utils.ClickHouseS3Credentials @@ -183,22 +183,12 @@ func NewClickHouseConnector( // This is the minimum version of ClickHouse that actually supports session token // https://github.com/ClickHouse/ClickHouse/issues/61230 minSupportedClickHouseVersion := "v24.3.1" - clickHouseVersionRow := database.QueryRow(ctx, "SELECT version()") - var clickHouseVersion string - err := clickHouseVersionRow.Scan(&clickHouseVersion) - if err != nil { - return nil, fmt.Errorf("failed to query ClickHouse version: %w", err) - } - // Ignore everything after patch version and prefix with "v", else semver.Compare will fail - versionParts := strings.SplitN(clickHouseVersion, ".", 4) - if len(versionParts) > 3 { - versionParts = versionParts[:3] - } - cleanedClickHouseVersion := "v" + strings.Join(versionParts, ".") + serverInfo := database.ServerInfo() + cleanedClickHouseVersion := fmt.Sprintf("v%d.%d.%d", serverInfo.Major, serverInfo.Minor, serverInfo.Revision) if semver.Compare(cleanedClickHouseVersion, minSupportedClickHouseVersion) < 0 { return nil, fmt.Errorf( "provide S3 Transient Stage details explicitly or upgrade to ClickHouse version >= %v, current version is %s. %s", - minSupportedClickHouseVersion, clickHouseVersion, + minSupportedClickHouseVersion, cleanedClickHouseVersion, "You can also contact PeerDB support for implicit S3 stage setup for older versions of ClickHouse.") } } @@ -213,7 +203,7 @@ func NewClickHouseConnector( }, nil } -func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) { +func Connect(ctx context.Context, config *protos.ClickhouseConfig) (*ch.Client, error) { var tlsSetting *tls.Config if !config.DisableTls { tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13} @@ -236,23 +226,14 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C tlsSetting.RootCAs = caPool } - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, - Auth: clickhouse.Auth{ - Database: config.Database, - Username: config.User, - Password: config.Password, - }, + conn, err := ch.Dial(ctx, ch.Options{ + Address: fmt.Sprintf("%s:%d", config.Host, config.Port), + Database: config.Database, + User: config.User, + Password: config.Password, TLS: tlsSetting, - Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "peerdb"}, - }, - }, + Compression: ch.CompressionLZ4, + ClientName: "peerdb", DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, }) @@ -290,21 +271,20 @@ var retryableExceptions = map[int32]struct{}{ } func isRetryableException(err error) bool { - if ex, ok := err.(*clickhouse.Exception); ok { + if ex, ok := err.(*ch.Exception); ok { if ex == nil { return false } - _, yes := retryableExceptions[ex.Code] + _, yes := retryableExceptions[int32(ex.Code)] return yes } return errors.Is(err, io.EOF) } -//nolint:unparam -func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...any) error { +func (c *ClickHouseConnector) exec(ctx context.Context, query string) error { var err error for i := range 5 { - err = c.database.Exec(ctx, query, args...) + err = c.database.Do(ctx, ch.Query{Body: query}) if !isRetryableException(err) { break } @@ -316,11 +296,10 @@ func (c *ClickHouseConnector) exec(ctx context.Context, query string, args ...an return err } -func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...any) (driver.Rows, error) { - var rows driver.Rows +func (c *ClickHouseConnector) query(ctx context.Context, query ch.Query) error { var err error for i := range 5 { - rows, err = c.database.Query(ctx, query, args...) + err = c.database.Do(ctx, query) if !isRetryableException(err) { break } @@ -329,23 +308,7 @@ func (c *ClickHouseConnector) query(ctx context.Context, query string, args ...a time.Sleep(time.Second * time.Duration(i*5+1)) } } - return rows, err -} - -func (c *ClickHouseConnector) queryRow(ctx context.Context, query string, args ...any) driver.Row { - var row driver.Row - for i := range 5 { - row = c.database.QueryRow(ctx, query, args...) - err := row.Err() - if !isRetryableException(err) { - break - } - c.logger.Info("[queryRow] retryable error", slog.Any("error", row.Err()), slog.Any("query", query), slog.Int64("i", int64(i))) - if i < 4 { - time.Sleep(time.Second * time.Duration(i*5+1)) - } - } - return row + return err } func (c *ClickHouseConnector) Close() error { @@ -365,72 +328,80 @@ func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error { func (c *ClickHouseConnector) execWithLogging(ctx context.Context, query string) error { c.logger.Info("[clickhouse] executing DDL statement", slog.String("query", query)) - return c.database.Exec(ctx, query) + return c.exec(ctx, query) } func (c *ClickHouseConnector) checkTablesEmptyAndEngine(ctx context.Context, tables []string) error { - queryInput := make([]interface{}, 0, len(tables)+1) - queryInput = append(queryInput, c.config.Database) + escapedTables := make([]string, 0, len(tables)) for _, table := range tables { - queryInput = append(queryInput, table) - } - rows, err := c.query(ctx, - fmt.Sprintf("SELECT name,engine,total_rows FROM system.tables WHERE database=? AND table IN (%s)", - strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) - if err != nil { + // TODO proper + escapedTables = append(escapedTables, "'"+table+"'") + } + var nameC chproto.ColStr + var engineC chproto.ColStr + var totalRowsC chproto.ColUInt64 + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf( + "SELECT name,engine,total_rows FROM system.tables WHERE database='%s' AND table IN (%s)", + c.config.Database, strings.Join(escapedTables, ",")), + Result: chproto.Results{ + {Name: "name", Data: &nameC}, + {Name: "engine", Data: &engineC}, + {Name: "total_rows", Data: &totalRowsC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + for idx := range block.Rows { + name := nameC.Row(idx) + engine := engineC.Row(idx) + totalRows := totalRowsC[idx] + if totalRows != 0 { + return fmt.Errorf("table %s exists and is not empty", name) + } + if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) { + c.logger.Warn("[clickhouse] table engine not explicitly supported", + slog.String("table", name), slog.String("engine", engine)) + } + } + return nil + }, + }); err != nil { return fmt.Errorf("failed to get information for destination tables: %w", err) } - defer rows.Close() - - for rows.Next() { - var tableName, engine string - var totalRows uint64 - err = rows.Scan(&tableName, &engine, &totalRows) - if err != nil { - return fmt.Errorf("failed to scan information for tables: %w", err) - } - if totalRows != 0 { - return fmt.Errorf("table %s exists and is not empty", tableName) - } - if !slices.Contains(acceptableTableEngines, strings.TrimPrefix(engine, "Shared")) { - c.logger.Warn("[clickhouse] table engine not explicitly supported", - slog.String("table", tableName), slog.String("engine", engine)) - } - } - if rows.Err() != nil { - return fmt.Errorf("failed to read rows: %w", rows.Err()) - } return nil } -func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, - tables []string, -) (map[string][]*protos.FieldDescription, error) { - tableColumnsMapping := make(map[string][]*protos.FieldDescription, len(tables)) - queryInput := make([]interface{}, 0, len(tables)+1) - queryInput = append(queryInput, c.config.Database) +func (c *ClickHouseConnector) getTableColumnsMapping(ctx context.Context, tables []string) (map[string][]*protos.FieldDescription, error) { + escapedTables := make([]string, 0, len(tables)) for _, table := range tables { - queryInput = append(queryInput, table) - } - rows, err := c.query(ctx, - fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=? AND table IN (%s)", - strings.Join(slices.Repeat([]string{"?"}, len(tables)), ",")), queryInput...) - if err != nil { + // TODO proper + escapedTables = append(escapedTables, "'"+table+"'") + } + tableColumnsMapping := make(map[string][]*protos.FieldDescription) + var nameC chproto.ColStr + var typeC chproto.ColStr + var tableC chproto.ColStr + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf("SELECT name,type,table FROM system.columns WHERE database=%s AND table IN (%s)", + c.config.Database, strings.Join(escapedTables, ",")), + Result: chproto.Results{ + {Name: "name", Data: &nameC}, + {Name: "type", Data: &typeC}, + {Name: "table", Data: &tableC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + for idx := range block.Rows { + table := tableC.Row(idx) + tableColumnsMapping[table] = append(tableColumnsMapping[table], &protos.FieldDescription{ + Name: nameC.Row(idx), + Type: typeC.Row(idx), + }) + } + return nil + }, + }, + ); err != nil { return nil, fmt.Errorf("failed to get columns for destination tables: %w", err) } - defer rows.Close() - for rows.Next() { - var tableName string - var fieldDescription protos.FieldDescription - err = rows.Scan(&fieldDescription.Name, &fieldDescription.Type, &tableName) - if err != nil { - return nil, fmt.Errorf("failed to scan columns for tables: %w", err) - } - tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], &fieldDescription) - } - if rows.Err() != nil { - return nil, fmt.Errorf("failed to read rows: %w", rows.Err()) - } return tableColumnsMapping, nil } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 8a44f17ffd..3b0e24a78e 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -3,14 +3,15 @@ package connclickhouse import ( "cmp" "context" - "database/sql" - "errors" "fmt" "slices" "strconv" "strings" "time" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" + "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -438,34 +439,25 @@ func (c *ClickHouseConnector) getDistinctTableNamesInBatch( ) ([]string, error) { rawTbl := c.getRawTableName(flowJobName) - q := fmt.Sprintf( - `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`, - rawTbl, normalizeBatchID, syncBatchID) - - rows, err := c.query(ctx, q) - if err != nil { - return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) - } - defer rows.Close() var tableNames []string - for rows.Next() { - var tableName sql.NullString - err = rows.Scan(&tableName) - if err != nil { - return nil, fmt.Errorf("error while scanning table name: %w", err) - } - - if !tableName.Valid { - return nil, errors.New("table name is not valid") - } - - tableNames = append(tableNames, tableName.String) - } - - if rows.Err() != nil { - return nil, fmt.Errorf("failed to read rows: %w", err) + var tableNameC chproto.ColStr + if err := c.query(ctx, ch.Query{ + Body: fmt.Sprintf( + `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`, + rawTbl, normalizeBatchID, syncBatchID), + Result: chproto.Results{ + {Name: "_peerdb_destination_table_name", Data: &tableNameC}, + }, + OnResult: func(ctx context.Context, block chproto.Block) error { + tableNames := slices.Grow(tableNames, block.Rows) + return tableNameC.ForEach(func(i int, s string) error { + tableNames = append(tableNames, s) + return nil + }) + }, + }); err != nil { + return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) } - return tableNames, nil } diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 1d5b2c1d71..5de6b05d2f 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -6,7 +6,8 @@ import ( "log/slog" "strings" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -44,15 +45,18 @@ func (c *ClickHouseConnector) SyncQRepRecords( return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream) } -func (c *ClickHouseConnector) getTableSchema(ctx context.Context, tableName string) ([]driver.ColumnType, error) { +func (c *ClickHouseConnector) getTableSchema(ctx context.Context, tableName string) (chproto.Results, error) { + // TODO escape queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName) - rows, err := c.query(ctx, queryString) - if err != nil { + var schema chproto.Results + if err := c.query(ctx, ch.Query{ + Body: queryString, + Result: schema.Auto(), + }); err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } - defer rows.Close() - return rows.ColumnTypes(), nil + return schema, nil } func (c *ClickHouseConnector) ConsolidateQRepPartitions(_ context.Context, config *protos.QRepConfig) error { diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index b507f448e5..6af056b15f 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -6,7 +6,7 @@ import ( "log/slog" "strings" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" @@ -54,7 +54,7 @@ func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a s.config.DestinationTableIdentifier, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - return s.connector.database.Exec(ctx, query) + return s.connector.exec(ctx, query) } func (s *ClickHouseAvroSyncMethod) SyncRecords( @@ -98,7 +98,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, - dstTableSchema []driver.ColumnType, + dstTableSchema chproto.Results, stream *model.QRecordStream, ) (int, error) { dstTableName := config.DestinationTableIdentifier @@ -129,7 +129,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( avroFileUrl := utils.FileURLForS3Service(endpoint, region, s3o.Bucket, avroFile.FilePath) selector := make([]string, 0, len(dstTableSchema)) for _, col := range dstTableSchema { - colName := col.Name() + colName := col.Name if strings.EqualFold(colName, config.SoftDeleteColName) || strings.EqualFold(colName, signColName) || strings.EqualFold(colName, config.SyncedAtColName) || @@ -149,7 +149,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( config.DestinationTableIdentifier, selectorStr, selectorStr, avroFileUrl, creds.AWS.AccessKeyID, creds.AWS.SecretAccessKey, sessionTokenPart) - if err := s.connector.database.Exec(ctx, query); err != nil { + if err := s.connector.exec(ctx, query); err != nil { s.connector.logger.Error("Failed to insert into select for ClickHouse", slog.Any("error", err)) return 0, err } diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 891fe55365..4ff697f263 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -2,13 +2,17 @@ package e2e_clickhouse import ( "context" + "encoding/binary" "errors" "fmt" - "reflect" + "math/big" + "strconv" "strings" "testing" "time" + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/jackc/pgx/v5" "github.com/shopspring/decimal" "github.com/stretchr/testify/require" @@ -87,8 +91,89 @@ func (s ClickHouseSuite) Teardown() { e2e.TearDownPostgres(s) } +// from clickhouse-go lib/column/bigint.go +func endianSwap(src []byte, not bool) { + for i := range len(src) / 2 { + if not { + src[i], src[len(src)-i-1] = ^src[len(src)-i-1], ^src[i] + } else { + src[i], src[len(src)-i-1] = src[len(src)-i-1], src[i] + } + } +} + +// from clickhouse-go lib/column/bigint.go +func rawToBigInt(v []byte, signed bool) *big.Int { + // LittleEndian to BigEndian + endianSwap(v, false) + lt := new(big.Int) + if signed && len(v) > 0 && v[0]&0x80 != 0 { + // [0] ^ will +1 + for i := 0; i < len(v); i++ { + v[i] = ^v[i] + } + lt.SetBytes(v) + // neg ^ will -1 + lt.Not(lt) + } else { + lt.SetBytes(v) + } + return lt +} + +// largely taken from clickhouse-go lib/column/decimal.go +func decimalRow(col chproto.ColResult, i int) decimal.Decimal { + typ := string(col.Type()) + lparam := strings.LastIndex(typ, "(") + if lparam == -1 { + panic("no ( in " + typ) + } + params := typ[lparam+1:] + rparam := strings.Index(params, ")") + if rparam == -1 { + panic("no ) in params " + params + " of " + typ) + } + params = typ[:rparam] + _, scaleStr, ok := strings.Cut(params, ",") + if !ok { + panic("no , in params " + params + " of " + typ) + } + scaleStr = strings.TrimSpace(scaleStr) + scale, err := strconv.Atoi(scaleStr) + if err != nil { + panic("failed to parse scale " + scaleStr + ": " + err.Error()) + } + + var value decimal.Decimal + switch vCol := col.(type) { + case *chproto.ColDecimal32: + v := vCol.Row(i) + value = decimal.New(int64(v), int32(-scale)) + case *chproto.ColDecimal64: + v := vCol.Row(i) + value = decimal.New(int64(v), int32(-scale)) + case *chproto.ColDecimal128: + v := vCol.Row(i) + b := make([]byte, 16) + binary.LittleEndian.PutUint64(b[0:64/8], v.Low) + binary.LittleEndian.PutUint64(b[64/8:128/8], v.High) + bv := rawToBigInt(b, true) + value = decimal.NewFromBigInt(bv, int32(-scale)) + case *chproto.ColDecimal256: + v := vCol.Row(i) + b := make([]byte, 32) + binary.LittleEndian.PutUint64(b[0:64/8], v.Low.Low) + binary.LittleEndian.PutUint64(b[64/8:128/8], v.Low.High) + binary.LittleEndian.PutUint64(b[128/8:192/8], v.High.Low) + binary.LittleEndian.PutUint64(b[192/8:256/8], v.High.High) + bv := rawToBigInt(b, true) + value = decimal.NewFromBigInt(bv, int32(-scale)) + } + return value +} + func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) { - ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) + chc, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) if err != nil { return nil, err } @@ -97,93 +182,115 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch if firstCol == "" { return nil, errors.New("no columns specified") } - rows, err := ch.Query( - context.Background(), - fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, cols, table, firstCol), - ) - if err != nil { - return nil, err - } - batch := &model.QRecordBatch{} - types := rows.ColumnTypes() - row := make([]interface{}, 0, len(types)) - for _, ty := range types { - nullable := ty.Nullable() - row = append(row, reflect.New(ty.ScanType()).Interface()) - var qkind qvalue.QValueKind - switch ty.DatabaseTypeName() { - case "String", "Nullable(String)": - qkind = qvalue.QValueKindString - case "Int32", "Nullable(Int32)": - qkind = qvalue.QValueKindInt32 - case "DateTime64(6)", "Nullable(DateTime64(6))": - qkind = qvalue.QValueKindTimestamp - case "Date32", "Nullable(Date32)": - qkind = qvalue.QValueKindDate - default: - if strings.Contains(ty.DatabaseTypeName(), "Decimal") { - qkind = qvalue.QValueKindNumeric - } else { - return nil, fmt.Errorf("failed to resolve QValueKind for %s", ty.DatabaseTypeName()) + var schema chproto.Results + if err := chc.Do(context.Background(), ch.Query{ + Body: fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, + cols, table, firstCol), + Result: schema.Auto(), + OnResult: func(ctx context.Context, block chproto.Block) error { + if len(batch.Schema.Fields) == 0 { + for _, col := range schema { + nullable := strings.HasPrefix(string(col.Data.Type()), "Nullable(") + var qkind qvalue.QValueKind + switch col.Data.Type() { + case "String", "Nullable(String)": + qkind = qvalue.QValueKindString + case "Int32", "Nullable(Int32)": + qkind = qvalue.QValueKindInt32 + case "DateTime64(6)", "Nullable(DateTime64(6))": + qkind = qvalue.QValueKindTimestamp + case "Date32", "Nullable(Date32)": + qkind = qvalue.QValueKindDate + default: + if strings.Contains(string(col.Data.Type()), "Decimal") { + qkind = qvalue.QValueKindNumeric + } else { + return fmt.Errorf("failed to resolve QValueKind for %s", col.Data.Type()) + } + } + batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ + Name: col.Name, + Type: qkind, + Precision: 0, + Scale: 0, + Nullable: nullable, + }) + } } - } - batch.Schema.Fields = append(batch.Schema.Fields, qvalue.QField{ - Name: ty.Name(), - Type: qkind, - Precision: 0, - Scale: 0, - Nullable: nullable, - }) - } - for rows.Next() { - if err := rows.Scan(row...); err != nil { - return nil, err - } - qrow := make([]qvalue.QValue, 0, len(row)) - for _, val := range row { - switch v := val.(type) { - case **string: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindString)) - } else { - qrow = append(qrow, qvalue.QValueString{Val: **v}) - } - case *string: - qrow = append(qrow, qvalue.QValueString{Val: *v}) - case **int32: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) - } else { - qrow = append(qrow, qvalue.QValueInt32{Val: **v}) - } - case *int32: - qrow = append(qrow, qvalue.QValueInt32{Val: *v}) - case **time.Time: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp)) - } else { - qrow = append(qrow, qvalue.QValueTimestamp{Val: **v}) - } - case *time.Time: - qrow = append(qrow, qvalue.QValueTimestamp{Val: *v}) - case **decimal.Decimal: - if *v == nil { - qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) - } else { - qrow = append(qrow, qvalue.QValueNumeric{Val: **v}) + for idx := range block.Rows { + qrow := make([]qvalue.QValue, 0, block.Columns) + for _, col := range schema { + switch v := col.Data.(type) { + case *chproto.ColNullable[string]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindString)) + } else { + qrow = append(qrow, qvalue.QValueString{Val: v.Values.Row(idx)}) + } + case *chproto.ColStr: + qrow = append(qrow, qvalue.QValueString{Val: v.Row(idx)}) + case *chproto.ColNullable[int32]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindInt32)) + } else { + qrow = append(qrow, qvalue.QValueInt32{Val: v.Values.Row(idx)}) + } + case *chproto.ColInt32: + qrow = append(qrow, qvalue.QValueInt32{Val: v.Row(idx)}) + case *chproto.ColNullable[time.Time]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindTimestamp)) + } else { + qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Values.Row(idx)}) + } + case *chproto.ColDateTime64: + qrow = append(qrow, qvalue.QValueTimestamp{Val: v.Row(idx)}) + case *chproto.ColNullable[chproto.Decimal32]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColNullable[chproto.Decimal64]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColNullable[chproto.Decimal128]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColNullable[chproto.Decimal256]: + if v.Nulls[idx] != 0 { + qrow = append(qrow, qvalue.QValueNull(qvalue.QValueKindNumeric)) + } else { + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v.Values, idx)}) + } + case *chproto.ColDecimal32, + *chproto.ColDecimal64, + *chproto.ColDecimal128, + *chproto.ColDecimal256: + qrow = append(qrow, qvalue.QValueNumeric{Val: decimalRow(v, idx)}) + default: + return fmt.Errorf("cannot convert %T to qvalue", v) + } } - case *decimal.Decimal: - qrow = append(qrow, qvalue.QValueNumeric{Val: *v}) - default: - return nil, fmt.Errorf("cannot convert %T to qvalue", v) + batch.Records = append(batch.Records, qrow) } - } - batch.Records = append(batch.Records, qrow) + + return nil + }, + }, + ); err != nil { + return nil, err } - return batch, rows.Err() + return batch, nil } func SetupSuite(t *testing.T) ClickHouseSuite { @@ -203,9 +310,9 @@ func SetupSuite(t *testing.T) ClickHouseSuite { s3Helper: s3Helper, } - ch, err := connclickhouse.Connect(context.Background(), s.PeerForDatabase("default").GetClickhouseConfig()) + chc, err := connclickhouse.Connect(context.Background(), s.PeerForDatabase("default").GetClickhouseConfig()) require.NoError(t, err, "failed to connect to clickhouse") - err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix) + err = chc.Do(context.Background(), ch.Query{Body: "CREATE DATABASE e2e_test_" + suffix}) require.NoError(t, err, "failed to create clickhouse database") return s diff --git a/flow/go.mod b/flow/go.mod index 87201bebbe..b9f6660395 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -10,7 +10,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.3.0 - github.com/ClickHouse/clickhouse-go/v2 v2.29.0 + github.com/ClickHouse/ch-go v0.62.0 github.com/PeerDB-io/glua64 v1.0.1 github.com/PeerDB-io/gluabit32 v1.0.2 github.com/PeerDB-io/gluaflatbuffers v1.0.1 @@ -78,7 +78,6 @@ require ( cloud.google.com/go/monitoring v1.21.1 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect - github.com/ClickHouse/ch-go v0.62.0 // indirect github.com/DataDog/zstd v1.5.6 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.2 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.48.2 // indirect @@ -102,6 +101,7 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/danieljoos/wincred v1.2.2 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/dmarkham/enumer v1.5.9 // indirect github.com/dvsekhvalnov/jose2go v1.7.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/envoyproxy/go-control-plane v0.13.0 // indirect @@ -116,6 +116,7 @@ require ( github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/hashicorp/go-version v1.6.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect @@ -126,7 +127,7 @@ require ( github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nexus-rpc/sdk-go v0.0.10 // indirect - github.com/paulmach/orb v0.11.1 // indirect + github.com/pascaldekloe/name v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/prometheus/client_golang v1.20.4 // indirect @@ -142,6 +143,9 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect go.opentelemetry.io/otel/trace v1.30.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/term v0.25.0 // indirect google.golang.org/grpc/stats/opentelemetry v0.0.0-20241004113128-859602c14c6c // indirect ) @@ -154,7 +158,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 // indirect github.com/Azure/go-amqp v1.2.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect - github.com/andybalholm/brotli v1.1.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.19 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.19 // indirect diff --git a/flow/go.sum b/flow/go.sum index 0e3c31cf1e..e452b9d9c1 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -64,8 +64,6 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/ClickHouse/ch-go v0.62.0 h1:eXH0hytXeCEEZHgMvOX9IiW7wqBb4w1MJMp9rArbkrc= github.com/ClickHouse/ch-go v0.62.0/go.mod h1:uzso52/PD9+gZj7tL6XAo8/EYDrx7CIwNF4c6PnO6S0= -github.com/ClickHouse/clickhouse-go/v2 v2.29.0 h1:Dj1w59RssRyLgGHXtYaWU0eIM1pJsu9nGPi/btmvAqw= -github.com/ClickHouse/clickhouse-go/v2 v2.29.0/go.mod h1:bLookq6qZJ4Ush/6tOAnJGh1Sf3Sa/nQoMn71p7ZCUE= github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY= github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.2 h1:cZpsGsWTIFKymTA0je7IIvi1O7Es7apb9CF3EQlOcfE= @@ -94,8 +92,6 @@ github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZ github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= github.com/aws/aws-sdk-go-v2 v1.32.0 h1:GuHp7GvMN74PXD5C97KT5D87UhIy4bQPkflQKbfkndg= @@ -183,6 +179,8 @@ github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE= github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4= github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg= +github.com/dmarkham/enumer v1.5.9 h1:NM/1ma/AUNieHZg74w67GkHFBNB15muOt3sj486QVZk= +github.com/dmarkham/enumer v1.5.9/go.mod h1:e4VILe2b1nYK3JKJpRmNdl5xbDQvELc6tQ8b+GsGk6E= github.com/dvsekhvalnov/jose2go v1.7.0 h1:bnQc8+GMnidJZA8zc6lLEAb4xNrIqHwO+9TzqvtQZPo= github.com/dvsekhvalnov/jose2go v1.7.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= @@ -254,7 +252,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -267,9 +264,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -298,6 +293,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjw github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= @@ -324,7 +321,6 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= @@ -359,7 +355,6 @@ github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA= github.com/microsoft/go-mssqldb v1.7.2/go.mod h1:kOvZKUdrhhFQmxLZqbwUV0rHkNkZpthMITIb2Ko1IoA= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -370,9 +365,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= -github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= -github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= -github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= +github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0= +github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -433,7 +427,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ= github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60= @@ -446,12 +439,8 @@ github.com/twpayne/go-geos v0.18.1 h1:dzUHvkxcJHXTSPDqYBA39M+OE2myyqZO9ytBSMjS37 github.com/twpayne/go-geos v0.18.1/go.mod h1:H5qP0wfgtZOl2g+KT0WGKn2z2mr5XPnGbgGlUefaCOM= github.com/urfave/cli/v3 v3.0.0-alpha9 h1:P0RMy5fQm1AslQS+XCmy9UknDXctOmG/q/FZkUFnJSo= github.com/urfave/cli/v3 v3.0.0-alpha9/go.mod h1:0kK/RUFHyh+yIKSfWxwheGndfnrvYSmYFVeKCh03ZUc= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -465,7 +454,6 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw= go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg= -go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/detectors/gcp v1.30.0 h1:GF+YVnUeJwOy+Ag2cTEpVZq+r2Tnci42FIiNwA2gjME= @@ -495,15 +483,22 @@ go.temporal.io/api v1.39.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0= go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -528,7 +523,6 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -549,9 +543,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -563,8 +555,6 @@ golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= @@ -622,8 +612,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=