diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 4e8975701..7331ebfba 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -205,7 +205,7 @@ func NewClickHouseConnector( return connector, nil } -func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) { +func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) { var tlsSetting *tls.Config if !config.DisableTls { tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13} @@ -228,6 +228,13 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C tlsSetting.RootCAs = caPool } + var settings clickhouse.Settings + if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load max_insert_threads config") + } else if maxInsertThreads != 0 { + settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads} + } + conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, Auth: clickhouse.Auth{ @@ -245,6 +252,7 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C {Name: "peerdb"}, }, }, + Settings: settings, DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, }) diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 566c8ead1..ceb860e1c 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -172,6 +172,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, + { + Name: "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS", + Description: "Configures max_insert_threads setting on clickhouse for inserting into destination table", + DefaultValue: "0", + ValueType: protos.DynconfValueType_UINT, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES", Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely", @@ -362,6 +370,10 @@ func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]str return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE") } +func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error) { + return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS") +} + func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) { return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") }