Skip to content

Commit

Permalink
PEERDB_CLICKHOUSE_MAX_INSERT_THREADS
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 14, 2024
1 parent 42b0208 commit da8cd2c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
10 changes: 9 additions & 1 deletion flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func NewClickHouseConnector(
return connector, nil
}

func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
Expand All @@ -228,6 +228,13 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
tlsSetting.RootCAs = caPool
}

var settings clickhouse.Settings
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config")
} else if maxInsertThreads != 0 {
settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads}
}

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", config.Host, config.Port)},
Auth: clickhouse.Auth{
Expand All @@ -245,6 +252,7 @@ func Connect(ctx context.Context, config *protos.ClickhouseConfig) (clickhouse.C
{Name: "peerdb"},
},
},
Settings: settings,
DialTimeout: 3600 * time.Second,
ReadTimeout: 3600 * time.Second,
})
Expand Down
12 changes: 12 additions & 0 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS",
Description: "Configures max_insert_threads setting on clickhouse for inserting into destination table",
DefaultValue: "0",
ValueType: protos.DynconfValueType_UINT,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_INTERVAL_SINCE_LAST_NORMALIZE_THRESHOLD_MINUTES",
Description: "Duration in minutes since last normalize to start alerting, 0 disables all alerting entirely",
Expand Down Expand Up @@ -362,6 +370,10 @@ func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]str
return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE")
}

func PeerDBClickHouseMaxInsertThreads(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_CLICKHOUSE_MAX_INSERT_THREADS")
}

func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM")
}
Expand Down

0 comments on commit da8cd2c

Please sign in to comment.