diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 8a7950fc3d..76f12c6673 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -11,6 +11,7 @@ import ( "maps" "net/url" "slices" + "strconv" "strings" "time" @@ -221,34 +222,14 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - /* - // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency - settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} - if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { - return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) - } else if maxInsertThreads != 0 { - settings["max_insert_threads"] = maxInsertThreads - } + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := []ch.Setting{{Key: "select_sequential_consistency", Value: "1"}} + if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) + } else if maxInsertThreads != 0 { + settings = append(settings, ch.Setting{Key: "max_insert_threads", Value: strconv.FormatInt(maxInsertThreads, 10)}) + } - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, - Auth: clickhouse.Auth{ - Database: config.Database, - Username: config.User, - Password: config.Password, - }, - TLS: tlsSetting, - Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "peerdb"}, - }, - }, - Settings: settings, - */ conn, err := ch.Dial(ctx, ch.Options{ Address: fmt.Sprintf("%s:%d", config.Host, config.Port), Database: config.Database, @@ -259,6 +240,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou ClientName: "peerdb", DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, + Settings: settings, }) if err != nil { return nil, fmt.Errorf("failed to connect to ClickHouse peer: %w", err)