diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 4e89757014..63ccea6937 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -128,7 +128,7 @@ func NewClickHouseConnector( config *protos.ClickhouseConfig, ) (*ClickHouseConnector, error) { logger := shared.LoggerFromCtx(ctx) - database, err := Connect(ctx, config) + database, err := Connect(ctx, env, config) if err != nil { return nil, fmt.Errorf("failed to open connection to ClickHouse peer: %w", err) } @@ -205,7 +205,7 @@ func NewClickHouseConnector( return connector, nil } -func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) { +func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) { var tlsSetting *tls.Config if !config.DisableTls { tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13} @@ -228,6 +228,13 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C tlsSetting.RootCAs = caPool } + var settings clickhouse.Settings + if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) + } else if maxInsertThreads != 0 { + settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads} + } + conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)}, Auth: clickhouse.Auth{ @@ -245,6 +252,7 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C {Name: "peerdb"}, }, }, + Settings: settings, DialTimeout: 3600 * time.Second, ReadTimeout: 3600 * time.Second, }) diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 79ff2aa7bb..9756761520 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -92,7 +92,7 @@ func (s ClickHouseSuite) Teardown() { } func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) { - ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) + ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) if err != nil { return nil, err } @@ -203,7 +203,7 @@ func SetupSuite(t *testing.T) ClickHouseSuite { s3Helper: s3Helper, } - ch, err := connclickhouse.Connect(context.Background(), s.PeerForDatabase("default").GetClickhouseConfig()) + ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig()) require.NoError(t, err, "failed to connect to clickhouse") err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix) require.NoError(t, err, "failed to create clickhouse database") diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 8b28573104..9c4fa2a167 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -505,7 +505,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) { }) e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) // now test weird names with rename based resync - ch, err := connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) + ch, err := connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) require.NoError(s.t, err) require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("DROP TABLE `%s`", dstTableName))) require.NoError(s.t, ch.Close()) @@ -523,7 +523,7 @@ func (s ClickHouseSuite) WeirdTable(tableName string) { }) e2e.EnvWaitForFinished(s.t, env, 3*time.Minute) // now test weird names with exchange based resync - ch, err = connclickhouse.Connect(context.Background(), s.Peer().GetClickhouseConfig()) + ch, err = connclickhouse.Connect(context.Background(), nil, s.Peer().GetClickhouseConfig()) require.NoError(s.t, err) require.NoError(s.t, ch.Exec(context.Background(), fmt.Sprintf("TRUNCATE TABLE `%s`", dstTableName))) require.NoError(s.t, ch.Close()) diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 566c8ead11..f3c2de0979 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -172,6 +172,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_CLICKHOUSE, }, + { + Name: "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS", + Description: "Configures max_insert_threads setting on clickhouse for inserting into destination table. Setting left unset when 0", + DefaultValue: "0", + ValueType: protos.DynconfValueType_UINT, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES", Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely", @@ -362,6 +370,10 @@ func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]str return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE") } +func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error) { + return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS") +} + func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) { return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") }