diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 63ccea693..f024a767e 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -228,11 +228,12 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - var settings clickhouse.Settings + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) } else if maxInsertThreads != 0 { - settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads} + settings["max_insert_threads"] = maxInsertThreads } conn, err := clickhouse.Open(&clickhouse.Options{